K8s源码阅读笔记之 API Server 生成和启动

API Server 是 k8s 最核心的管理组件,也是相对而言读起来比较简单的。

命令行

使用 cobra 框架作为命令行启动工具,通过启动时添加参数来做配置

cmd/kube-apiserver/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,

// stop printing usage when the command errors
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
// 核心逻辑,RunE 相比 Run 会返回 Error
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()

// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)

// set default options
// 补全默认值
completedOptions, err := s.Complete()
if err != nil {
return err
}

// validate options
// 校验配置是否合法或者是否有冲突
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
// 运行 Run 方法
// 也是通过 chan 做服务停止的信号处理
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}

// 将命令行指定的配置和 fs 关联起来
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}

cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)

return cmd

command 返回出来,然后运行跑起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Run runs the specified APIServer.  This should never exit.
// 也就是执行逻辑,将命令行跑起来
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// 创建配置
config, err := NewConfig(opts)
if err != nil {
return err
}
// 补全配置
completed, err := config.Complete()
if err != nil {
return err
}
// 创建请求链(跟 gin 的请求链很相似)
server, err := CreateServerChain(completed)
if err != nil {
return err
}

prepared, err := server.PrepareRun()
if err != nil {
return err
}

return prepared.Run(stopCh)
}

其中 config.Complete() 配置补全内容非常多,这是由于 APIServer 的模块众多,相应的配置参数也非常之多,分类一下:

  • genericConfig 通用配置
  • OpenAPI配置
  • Storage (etcd)配置
  • Authentication 认证配置
  • Authorization 授权配置

Handler Chain

其中 CreateServerChain 则是构建请求流的处理过程:

Aggregation Server -> Master -> Extension Server -> Not Found Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
// notFoundHandler ,前面所有的 chan 都没有命中,就是用 notFoundHandler
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
// ExtensionsServer:负责 CRD 的处理,也就是 Custom API Server
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

// kubeAPIServer 负责 build-in 的 API Object 相关的处理
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}

// aggregator comes last in the chain
// 负责转发请求到 API Server 或者 Custom API Server
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}

return aggregatorServer, nil
}

API Server 的 Instance 是通过 config.ControlPlane.New(apiExtensionsServer.GenericAPIServer) 实现的

pkg/controlplane/instance.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
// 这个就是返回的 Instance
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}

...

// 注册核心的方法,用于响应 REST 请求,用于处理 元老级资源请求
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
Proxy: corerest.ProxyConfig{
Transport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
NodePortRange: c.ExtraConfig.ServiceNodePortRange,
IPRepairInterval: c.ExtraConfig.RepairServicesInterval,
},
})
if err != nil {
return nil, err
}

...

// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
// 所有的 Provider,用于响应 REST 请求,元老级资源和非元老级资源
// 如果具有相同名称的资源存在于这些组中的多个组中(例如“deployments.apps”和“deployments.extensions”),则此列表的顺序决定未经限定的资源名称(例如“deployments”)应优先选择哪个组。
// 此优先级顺序用于本地发现,但最终会聚合到`k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go` 中,并具有特定优先级。
restStorageProviders := []RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
resourcerest.RESTStorageProvider{},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}

由于 API 的划分不是根据资源类型,而是作为参数,因此需要将对应 Object 注册进来,然后针对性处理。

实现都在 InstallAPIs 里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
...

for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
// 生成 apiGroupInfo,包含所有元老资源信息
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}

...
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
// 将元老资源都 install 到 APIGroup 里面
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}

// 将 非元老资源 install 进来
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil

...
}

具体资源的定义,是通过包导入的方式实现。在 cmd/kube-apiserver/app/server.go 中,通过导入 "k8s.io/kubernetes/pkg/controlplane",在 pkg/controlplane/import_known_versions.go 中,通过导入匿名包执行 init 函数实现导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
// These imports are the API groups the API server will support.
_ "k8s.io/kubernetes/pkg/apis/admission/install"
_ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
_ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install"
_ "k8s.io/kubernetes/pkg/apis/apps/install"
_ "k8s.io/kubernetes/pkg/apis/authentication/install"
_ "k8s.io/kubernetes/pkg/apis/authorization/install"
_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/certificates/install"
_ "k8s.io/kubernetes/pkg/apis/coordination/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
_ "k8s.io/kubernetes/pkg/apis/discovery/install"
_ "k8s.io/kubernetes/pkg/apis/events/install"
_ "k8s.io/kubernetes/pkg/apis/extensions/install"
_ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
_ "k8s.io/kubernetes/pkg/apis/networking/install"
_ "k8s.io/kubernetes/pkg/apis/node/install"
_ "k8s.io/kubernetes/pkg/apis/policy/install"
_ "k8s.io/kubernetes/pkg/apis/rbac/install"
_ "k8s.io/kubernetes/pkg/apis/resource/install"
_ "k8s.io/kubernetes/pkg/apis/scheduling/install"
_ "k8s.io/kubernetes/pkg/apis/storage/install"
)

例如

1
2
3
4
5
6
7
8
9
10
11
12
func init() {
Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
// 依次将group、version注册到了scheme中,并设置各version的优先级
utilruntime.Must(events.AddToScheme(scheme))
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}

scheme 导入到 API Server 中

pkg/api/legacyscheme/scheme.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var (
// Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered.
// NOTE: If you are copying this file to start a new api group, STOP! Copy the
// extensions group instead. This Scheme is special and should appear ONLY in
// the api group, unless you really know what you're doing.
// TODO(lavalamp): make the above error impossible.
Scheme = runtime.NewScheme()

// Codecs provides access to encoding and decoding for the scheme
Codecs = serializer.NewCodecFactory(Scheme)

// ParameterCodec handles versioning of objects that are converted to query parameters.
ParameterCodec = runtime.NewParameterCodec(Scheme)
)

Scheme

Scheme 注册表,定义了序列化和反序列化 API 对象的方法,用于将组、版本和类型信息转换为 Go 模式,并在不同版本之间进行映射。

方案是随着时间推移而建立的分级 API 和配置的基础。

在一个 Scheme 中,Type 是特定的 Go 结构体,Version 是表示该 Type 特定表现形式的时点标识符(通常向后兼容),Kind 是该 Version 内 Type 的唯一名称,Group 标识一组随时间演变的 Versions、Kinds 和 Types。

Unversioned Type 指尚未正式绑定到类型并承诺向后兼容(实际上是不希望未来发生更改的 v1 类型)。

预计方案在运行时不会更改,并且只有在注册完成后才能保证线程安全。

总的来说,Schema 是一个结构体,内含处理外部 Version 之间转换,GVK 和 Go Type 之间转换之用的数据和方法。

vendor/k8s.io/apimachinery/pkg/runtime/scheme.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Scheme defines methods for serializing and deserializing API objects, a type
// registry for converting group, version, and kind information to and from Go
// schemas, and mappings between Go schemas of different versions. A scheme is the
// foundation for a versioned API and versioned configuration over time.
//
// In a Scheme, a Type is a particular Go struct, a Version is a point-in-time
// identifier for a particular representation of that Type (typically backwards
// compatible), a Kind is the unique name for that Type within the Version, and a
// Group identifies a set of Versions, Kinds, and Types that evolve over time. An
// Unversioned Type is one that is not yet formally bound to a type and is promised
// to be backwards compatible (effectively a "v1" of a Type that does not expect
// to break in the future).
//
// Schemes are not expected to change at runtime and are only threadsafe after
// registration is complete.
type Scheme struct {
// gvkToType allows one to figure out the go type of an object with
// the given version and name.
// gvk 和 type 两者相互转换,用map结构存储gvk和Type的映射关系,一个gvk只会具体对应一个Type
gvkToType map[schema.GroupVersionKind]reflect.Type

// 几乎所有的资源都是携带版本的,是常用的类型,用map结构存储type和gvk的映射关系,不同的是,一个type可能会对应多个gvk
// typeToGVK allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]schema.GroupVersionKind

// 无版本资源。这个在现版本的k8s中使用非常少,可以忽略
// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]schema.GroupVersionKind

// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type

// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc

// defaulterFuncs is a map to funcs to be called with an object to provide defaulting
// the provided object must be a pointer.
// 设置默认值
defaulterFuncs map[reflect.Type]func(interface{})

// converter stores all registered conversion functions. It also has
// default converting behavior.
// map结构,存放资源版本转换的方法
converter *conversion.Converter

// versionPriority is a map of groups to ordered lists of versions for those groups indicating the
// default priorities of these versions as registered in the scheme
versionPriority map[string][]string

// observedVersions keeps track of the order we've seen versions during type registration
observedVersions []schema.GroupVersion

// schemeName is the name of this scheme. If you don't specify a name, the stack of the NewScheme caller will be used.
// This is useful for error reporting to indicate the origin of the scheme.
schemeName string
}
1
2
3
4
5
6
7
// GroupVersionKind unambiguously identifies a kind.  It doesn't anonymously include GroupVersion
// to avoid automatic coercion. It doesn't use a GroupVersion to avoid custom marshalling
type GroupVersionKind struct {
Group string
Version string
Kind string
}

官方通常通过缩写词 GVR(GroupVersionKind) 来描述一个资源的明确位置(类似于绝对路径),例如 deployment 对应的路径是:apps/v1/deployments/status

同理,GVK(GroupVersionKind) 锚定资源的明确所属类型,在项目代码中也经常用到。

一般来说,Scheme 的用法是通过 json/yaml 序列化成某一个版本,然后转换成 内部数据,或者通过其他的方式,以另外的版本注入数据。

因此,可能某一个 API Object 由 V1 版本创建,但是可以通过被其他的版本例如 V2 的客户端操作。

方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Converter() *Converter
AddUnversionedTypes(version GroupVersion, types ...Object)
// 提供的两个注册方法,使用 reflect 反射的方法获取 type obj 的 gvk 然后进行注册
AddKnownTypes(gv GroupVersion, types ...Object)
AddKnownTypeWithName(gvk GroupVersionKind, obj Object)

KnownTypes(gv GroupVersion) map[string]reflect.Type
VersionsForGroupKind(gk GroupKind) []GroupVersion
AllKnownTypes() map[GroupVersionKind]reflect.Type
ObjectKinds(obj Object) ([]GroupVersionKind, bool, error)
Recognizes(gvk GroupVersionKind) bool
IsUnversioned(obj Object) (bool, bool)
New(kind GroupVersionKind) (Object, error)
AddIgnoredConversionType(from interface{}, to interface{}) error
AddConversionFunc(a interface{}, b interface{}, fn ConversionFunc) error
AddGeneratedConversionFunc(a interface{}, b interface{}, fn ConversionFunc) error
AddFieldLabelConversionFunc(gvk GroupVersionKind, conversionFunc FieldLabelConversionFunc) error
AddTypeDefaultingFunc(srcType Object, fn func(interface{}))
Default(src Object)
Convert(in interface{}, out interface{}, context interface{}) error
ConvertFieldLabel(gvk GroupVersionKind, label string, value string) (string, string, error)
ConvertToVersion(in Object, target GroupVersioner) (Object, error)
UnsafeConvertToVersion(in Object, target GroupVersioner) (Object, error)
SetVersionPriority(versions ...GroupVersion) error
PrioritizedVersionsForGroup(group string) []GroupVersion
PrioritizedVersionsAllGroups() []GroupVersion
PreferredVersionAllGroups() []GroupVersion
IsGroupRegistered(group string) bool
IsVersionRegistered(version GroupVersion) bool
Name() string

注入的过程,例如:

vendor/k8s.io/api/apps/v1/register.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
var (
// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
localSchemeBuilder = &SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Deployment{},
&DeploymentList{},
&StatefulSet{},
&StatefulSetList{},
&DaemonSet{},
&DaemonSetList{},
&ReplicaSet{},
&ReplicaSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

注入的内容就是 kubernetes 里面的 Object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Deployment enables declarative updates for Pods and ReplicaSets.
type Deployment struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Specification of the desired behavior of the Deployment.
// +optional
Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// Most recently observed status of the Deployment.
// +optional
Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

通过 Builder 的设计模式,构建 Object:

image-20240503221253955

Object

Runtime库被称为运行时,几乎是所有程序的核心库,在k8s中也不例外。

k8s的运行时实现在 vendor/k8s.io/apimachinery/pkg/runtime/ 路径下。

其中,runtime.Object是K8s中所有资源类型结构的基石,作为interface被封装,所有的资源对象均以runtime.Object为基础结构实现了相应的interface方法,runtime.Object是一种通用的struct结构体,资源对象可以与runtime.Object通用对象互相转换。

vendor/k8s.io/apimachinery/pkg/runtime/interfaces.go

1
2
3
4
5
6
7
8
// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
type Object interface {
GetObjectKind() schema.ObjectKind
DeepCopyObject() Object
}

vendor/k8s.io/apimachinery/pkg/runtime/schema/interfaces.go

1
2
3
4
5
6
7
8
9
10
11
// All objects that are serialized from a Scheme encode their type information. This interface is used
// by serialization to set type information from the Scheme onto the serialized version of an object.
// For objects that cannot be serialized or have unique requirements, this interface may be a no-op.
type ObjectKind interface {
// SetGroupVersionKind sets or clears the intended serialized kind of an object. Passing kind nil
// should clear the current setting.
SetGroupVersionKind(kind GroupVersionKind)
// GroupVersionKind returns the stored group, version, and kind of an object, or an empty struct
// if the object does not expose or provide these fields.
GroupVersionKind() GroupVersionKind
}

APIServer对资源的描述支持多种格式,分别对应不同的Serializer。

vendor/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

1
2
3
4
5
6
7
8
9
// CodecFactory provides methods for retrieving codecs and serializers for specific
// versions and content types.
type CodecFactory struct {
scheme *runtime.Scheme
universal runtime.Decoder
accepts []runtime.SerializerInfo

legacySerializer runtime.Serializer
}

通过 yaml 的序列化代码可以看到,默认以json格式响应,而对于yaml格式,先将其转换为json格式,再转换回yaml格式响应。

vendor/k8s.io/apimachinery/pkg/runtime/serializer/yaml/yaml.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// yamlSerializer converts YAML passed to the Decoder methods to JSON.
type yamlSerializer struct {
// the nested serializer
runtime.Serializer
}

// yamlSerializer implements Serializer
var _ runtime.Serializer = yamlSerializer{}

// NewDecodingSerializer adds YAML decoding support to a serializer that supports JSON.
func NewDecodingSerializer(jsonSerializer runtime.Serializer) runtime.Serializer {
return &yamlSerializer{jsonSerializer}
}

func (c yamlSerializer) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
out, err := yaml.ToJSON(data)
if err != nil {
return nil, nil, err
}
data = out
return c.Serializer.Decode(data, gvk, into)
}

Resource

在K8s的设计中,resource 是其最基础、最重要的概念,也是最小的管理单位,所有的管理对象都承载在一个个的 resource 实例上,为了实现这些 resource 的复杂管理逻辑,又进一步地将他们分组化、版本化,依照逻辑层次,形成了 GroupVersionKindResource 核心数据结构:

  • Group:资源组,也称APIGroup,常见的有core、apps、extensions等 vendor/k8s.io/api 目录下
  • Version:资源版本,也称APIVersion,常见的有v1、v1beta1 (Resource可能属于拥有多个Version,这些version也会有优先级之分,例如deployment即属于apps/v1,又属于extensions/v1beta1,在不同k8s版本中,version的优先级可能会变化)
  • Kind:资源种类,描述资源的类别,例如pod类别、svc类别等
  • Resource:资源实例对象,也称为 APIResource
  • SubResource:子资源,部分资源实例会 有子资源,例如 Deployment 资源会拥有 Status 子资源
  • CRD: Custom Resource Definitions,用户自定义资源类型

资源操作方式:

概念层面,每种resource都有对应的管理操作方法,目前支持的有这几种:

  • get
  • list
  • watch
  • create
  • update
  • patch
  • delete
  • deletecolletction
  • proxy

分别归属于 增、删、改、查四类。

落实到代码里,资源对应的操作方法,都在metav1.Verbs结构体中归纳:

vendor/k8s.io/apimachinery/pkg/apis/meta/v1/types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// APIResource specifies the name of a resource and whether it is namespaced.
type APIResource struct {
...
// verbs is a list of supported kube verbs (this includes get, list, watch, create,
// update, patch, delete, deletecollection, and proxy)
Verbs Verbs `json:"verbs" protobuf:"bytes,4,opt,name=verbs"`
...
}

// Verbs masks the value so protobuf can generate
//
// +protobuf.nullable=true
// +protobuf.options.(gogoproto.goproto_stringer)=false
type Verbs []string

func (vs Verbs) String() string {
return fmt.Sprintf("%v", []string(vs))
}

使用[]string结构来描述资源所对应的操作,而[]string终归只是描述,需要与实际的存储资源CRUD操作关联。

Internal Version 注册方式

在k8s的设计中,资源版本分外部版本(external)和内部版本(internal)之分,外部版本(例如v1/v1beta1/v1beta2)提供给外部使用,而对应的内部版本仅在APIServer内部使用。

区分内外版本的作用:

  • 提供不同版本之间的转换功能,例如从v1beta1–>v1的过程实际是v1beta1–> internal –>v1,转换函数会注册到scheme表中
  • 减少复杂度,方便版本维护,避免维护多个版本的对应代码,实际APIServer端处理的都是转换后的内部版本
  • 不同外部版本资源之间的字段/功能可能存在些许差异,而内部版本包含所有版本的字段/功能,这为它作为外部资源版本之间转换的桥梁提供了基础。

内部版本和外部版本对于资源结构体定义显著的区别是,内部版本是不带json和proto标签的,因为其不需要结构化提供给外部。

以向APIServer发起创建一个资源实例为例,在解码时,APIServer首先从 HTTP 路径中获取对应的外部version,然后使用 scheme 以外部version创建一个空对象,然后将该对象转换成内部版本对应的对象结构进行持久存储(写入etcd).

而在查询请求中,资源对象会被从内部版本转换为路径对应的外部版本,响应给请求端。

image.png

内部版本的代码:

pkg/apis/apps/types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type StatefulSet struct {
metav1.TypeMeta
// +optional
metav1.ObjectMeta

// Spec defines the desired identities of pods in this set.
// +optional
Spec StatefulSetSpec

// Status is the current status of Pods in this StatefulSet. This data
// may be out of date by some window of time.
// +optional
Status StatefulSetStatus
}

pkg/apis/apps/install/install.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func init() {
// 注册 legacyscheme 中的 Schema
Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
utilruntime.Must(apps.AddToScheme(scheme))
utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}

pkg/apis/apps/register.go

1
2
3
4
5
6
var (
// SchemeBuilder stores functions to add things to a scheme.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme applies all stored functions t oa scheme.
AddToScheme = SchemeBuilder.AddToScheme
)

External Version 注册方式

外部版本的代码路径为

vendor/k8s.io/api/apps/v1/types.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// StatefulSet represents a set of pods with consistent identities.
// Identities are defined as:
// - Network: A single stable DNS and hostname.
// - Storage: As many VolumeClaims as requested.
//
// The StatefulSet guarantees that a given network identity will always
// map to the same storage identity.
type StatefulSet struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Spec defines the desired identities of pods in this set.
// +optional
Spec StatefulSetSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// Status is the current status of Pods in this StatefulSet. This data
// may be out of date by some window of time.
// +optional
Status StatefulSetStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

例如 V1,与 Internal Version 共用 install.go

pkg/apis/apps/v1/register.go

1
2
3
4
5
6
func init() {
// We only register manually written functions here. The registration of the
// generated functions takes place in the generated files. The separation
// makes the code compile even when the generated files are missing.
localSchemeBuilder.Register(addDefaultingFuncs)
}

以及通过代码生成的方式注册

pkg/apis/apps/v1/zz_generated.conversion.go

1
2
3
func init() {
localSchemeBuilder.Register(RegisterConversions)
}

代码生生成方式:

1
2
3
4
5
6
// +k8s:conversion-gen=k8s.io/kubernetes/pkg/apis/apps
// +k8s:conversion-gen-external-types=k8s.io/api/apps/v1
// +k8s:defaulter-gen=TypeMeta
// +k8s:defaulter-gen-input=k8s.io/api/apps/v1

package v1 // import "k8s.io/kubernetes/pkg/apis/apps/v1"

使用代码生成的原因:

  1. 有 Internal 和 External Version 的转换
  2. Go 语言没有继承机制

例如:

pkg/apis/apps/v1/zz_generated.conversion.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func autoConvert_v1_ControllerRevision_To_apps_ControllerRevision(in *v1.ControllerRevision, out *apps.ControllerRevision, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := runtime.Convert_runtime_RawExtension_To_runtime_Object(&in.Data, &out.Data, s); err != nil {
return err
}
out.Revision = in.Revision
return nil
}

func autoConvert_apps_ControllerRevision_To_v1_ControllerRevision(in *apps.ControllerRevision, out *v1.ControllerRevision, s conversion.Scope) error {
out.ObjectMeta = in.ObjectMeta
if err := runtime.Convert_runtime_Object_To_runtime_RawExtension(&in.Data, &out.Data, s); err != nil {
return err
}
out.Revision = in.Revision
return nil
}

pkg/apis/apps/v1/zz_generated.defaults.go

1
2
3
4
func SetObjectDefaults_Deployment(in *v1.Deployment) {
SetDefaults_Deployment(in)
corev1.SetDefaults_PodSpec(&in.Spec.Template.Spec)
...

pkg/apis/apps/v1/defaults.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// SetDefaults_Deployment sets additional defaults compared to its counterpart
// in extensions. These addons are:
// - MaxUnavailable during rolling update set to 25% (1 in extensions)
// - MaxSurge value during rolling update set to 25% (1 in extensions)
// - RevisionHistoryLimit set to 10 (not set in extensions)
// - ProgressDeadlineSeconds set to 600s (not set in extensions)
func SetDefaults_Deployment(obj *appsv1.Deployment) {
// Set DeploymentSpec.Replicas to 1 if it is not set.
if obj.Spec.Replicas == nil {
obj.Spec.Replicas = new(int32)
*obj.Spec.Replicas = 1
}
strategy := &obj.Spec.Strategy
// Set default DeploymentStrategyType as RollingUpdate.
if strategy.Type == "" {
strategy.Type = appsv1.RollingUpdateDeploymentStrategyType
}
if strategy.Type == appsv1.RollingUpdateDeploymentStrategyType {
if strategy.RollingUpdate == nil {
rollingUpdate := appsv1.RollingUpdateDeployment{}
strategy.RollingUpdate = &rollingUpdate
}
if strategy.RollingUpdate.MaxUnavailable == nil {
// Set default MaxUnavailable as 25% by default.
maxUnavailable := intstr.FromString("25%")
strategy.RollingUpdate.MaxUnavailable = &maxUnavailable
}
if strategy.RollingUpdate.MaxSurge == nil {
// Set default MaxSurge as 25% by default.
maxSurge := intstr.FromString("25%")
strategy.RollingUpdate.MaxSurge = &maxSurge
}
}
if obj.Spec.RevisionHistoryLimit == nil {
obj.Spec.RevisionHistoryLimit = new(int32)
*obj.Spec.RevisionHistoryLimit = 10
}
if obj.Spec.ProgressDeadlineSeconds == nil {
obj.Spec.ProgressDeadlineSeconds = new(int32)
*obj.Spec.ProgressDeadlineSeconds = 600
}
}

Generic Server

提供暴露 http 服务所需基础设施。将操作 API Object 的 restful 服务暴露出去。

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {

...

// "Outputs"
// Handler holds the handlers being used by this API server
// http handler 句柄
Handler *APIServerHandler

// 一些钩子函数
// PostStartHooks are each called after the server has started listening, in a separate go func for each
// with no guarantee of ordering between them. The map key is a name used for error reporting.
// It may kill the process with a panic if it wishes to by returning an error.
postStartHookLock sync.Mutex
postStartHooks map[string]postStartHookEntry
postStartHooksCalled bool
disabledPostStartHooks sets.String

preShutdownHookLock sync.Mutex
preShutdownHooks map[string]preShutdownHookEntry
preShutdownHooksCalled bool
...
}

一些重要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
UnprotectedHandler() http.Handler
PostStartHooks() map[string]postStartHookEntry
PreShutdownHooks() map[string]preShutdownHookEntry
HealthzChecks() []HealthChecker
ListedPaths() []string
NextDelegate() DelegationTarget
RegisterMuxAndDiscoveryCompleteSignal(signalName string, signal <-chan struct{}) error
MuxAndDiscoveryCompleteSignals() map[string]<-chan struct{}
RegisterDestroyFunc(destroyFn func())
Destroy()
PrepareRun() preparedGenericAPIServer
InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error
InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
InstallAPIGroup(apiGroupInfo *APIGroupInfo) error
AddHealthChecks(checks ...HealthChecker) error
AddBootSequenceHealthChecks(checks ...HealthChecker) error
AddReadyzChecks(checks ...HealthChecker) error
AddLivezChecks(delay time.Duration, checks ...HealthChecker) error
AddPostStartHook(name string, hook PostStartHookFunc) error
AddPostStartHookOrDie(name string, hook PostStartHookFunc)
AddPreShutdownHook(name string, hook PreShutdownHookFunc) error
AddPreShutdownHookOrDie(name string, hook PreShutdownHookFunc)
RunPostStartHooks(stopCh <-chan struct{})
RunPreShutdownHooks() error

构建 Requets Handler

pkg/controlplane/instance.go

1
2
3
4
5
6
7
8
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
...
}

vendor/k8s.io/apiserver/pkg/server/config.go

1
2
3
4
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
...
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
}

vendor/k8s.io/apiserver/pkg/server/handler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {

// 构建 handler

nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

// 使用 restful 构建 mux
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})

director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}

// 最后的 Handler
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}

vendor/k8s.io/apiserver/pkg/server/handler.go

1
2
3
4
// ServeHTTP makes it an http.Handler
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}

k8s选用的Restful框架是go-restful,简单说明一下go-restful的结构,辅助后面对于APIServer工作流程的理解。

go-restful 层级结构概念自顶上下依次有:

  • Container: 一个Container就是一个独立的http server,可拥有独立的地址端口组合(类似nginx的server层级)
  • WebService: 大粒度的分类,某一类别的服务可归属到同一个WebService中,其下包含多个Route
  • Route: 每个Route对应具体的uri路径,将该路径路由到对应的handler函数上

基本上跟 gin 框架差不多,mux 为句柄,相等于 Container,WebService 为 group,route 为具体处理请求的 handler。

再看 handlerChainBuilder,构建出的 Chain

vendor/k8s.io/apiserver/pkg/server/config.go

1
2
3
4
5
6
7
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
...
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
...
}

cmd/kube-apiserver/app/aggregator.go

1
2
3
4
5
6
7
8
9
10
11
12
13
func createAggregatorConfig(
kubeAPIServerConfig genericapiserver.Config,
commandOptions controlplaneapiserver.CompletedOptions,
externalInformers kubeexternalinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
...
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
...
}

vendor/k8s.io/apiserver/pkg/server/config.go

1
2
3
4
5
func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler {
// WithStorageVersionPrecondition needs the WithRequestInfo to run first
handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager, c.Serializer)
return DefaultBuildHandlerChain(handler, c)
}

鉴权、超时、跨域、重试、recovery 等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := apiHandler

handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")

if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator, c.RequestTimeout/4)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}

handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")

handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")

failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences, c.Authentication.RequestHeaderConfig)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authentication")

handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
if c.ShutdownWatchTerminationGracePeriod > 0 {
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
}
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditInit(handler)
return handler
}

OpenAPI 与 Generic Server

k8s 通过生成 swagger 文档,定义对外提供的 restful service,客户端可以依照该规定来向 api server 发 http 请求。

hack/update-openapi-spec.sh

生成的 swagger 在 api/openapi-spec/swagger.json,里面可以分为六个部分:

  • definitions:定义 yaml 文档的格式
  • info
  • paths:定义支持的方法
  • security
  • securityDefinitions
  • swagger:描述 swagger 文档版本

生成定义的方法:

例如,在 staging/src/k8s.io/api/apps/v1/doc.go 中,定义

1
2
3
4
5
// +k8s:deepcopy-gen=package
// +k8s:protobuf-gen=package
// +k8s:openapi-gen=true

package v1 // import "k8s.io/api/apps/v1"

记录会将当前目录中定义的 types 生成对应格式的资源,staging/src/k8s.io/api/apps/v1/types.go,生成出来的文件是 pkg/generated/openapi/zz_generated.openapi.go

这个文件中,可以找到当前 kubernetes 版本中所有内建的 API Object 的 openAPI definition,井且每个外部版本+API Object的组合会有一个 Defition。

这里的 key 和 swagger json 中的 definition id 有一对一映射关系

staging/src/k8s.io/apiserver/pkg/endpoints/openapi/openapi.go

1
2
3
4
5
6
7
8
9
// GetDefinitionName returns the name and tags for a given definition
func (d *DefinitionNamer) GetDefinitionName(name string) (string, spec.Extensions) {
if groupVersionKinds, ok := d.typeGroupVersionKinds[name]; ok {
return friendlyName(name), spec.Extensions{
extensionGVK: groupVersionKinds.JSON(),
}
}
return friendlyName(name), nil
}

这些Generated code的作用:为每一个以字符串(就是上面那个key)标识的api object, 生成其 openapi definition, definition的主体是一个json schema,该 schema定义了这个api object的 json 表示中可以有哪些属性,属性类型信息等等。

在主函数 run 起来之前,首先会创建配置

cmd/kube-apiserver/app/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()

genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
opts.CompletedOptions,
[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
generatedopenapi.GetOpenAPIDefinitions,
)
if err != nil {
return nil, nil, nil, err
}
...
}

可以猜想到上面的 OpenAPIDefinitions 会以配置的方式注入。

在预启动时,也就是主函数中的 PrepareRun 方法

vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

1
2
3
4
5
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
...
prepared := s.GenericAPIServer.PrepareRun()
...
}

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

1
2
3
4
5
6
7
8
9
10
11
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
s.delegationTarget.PrepareRun()

if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
Config: s.openAPIConfig,
}.InstallV2(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
}
...
}

vendor/k8s.io/apiserver/pkg/server/routes/openapi.go

1
2
3
4
5
6
7
8
9
10
11
12
// Install adds the SwaggerUI webservice to the given mux.
func (oa OpenAPI) InstallV2(c *restful.Container, mux *mux.PathRecorderMux) (*handler.OpenAPIService, *spec.Swagger) {
spec, err := builder2.BuildOpenAPISpecFromRoutes(restfuladapter.AdaptWebServices(c.RegisteredWebServices()), oa.Config)
if err != nil {
klog.Fatalf("Failed to build open api spec for root: %v", err)
}
spec.Definitions = handler.PruneDefaults(spec.Definitions)
openAPIVersionedService := handler.NewOpenAPIService(spec)
openAPIVersionedService.RegisterOpenAPIVersionedService("/openapi/v2", mux)

return openAPIVersionedService, spec
}

将 openAPI Install 到当前的 API Server。

当用户请求 /openapi/v2 ,spec 信息会被 mux 返回。

注册APIGroup

也就是注册APIGroup下的所有resource。

ExternsionsServer

在创建 Server Chain 时,创建的 ExtensionsServer

创建 genericapiserver

cmd/kube-apiserver/app/server.go

1
2
3
4
5
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
...
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
...
}

genericServer 提供了一个通用的 http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。

无论是 APIServer 还是 APIExtensionsServer 都依赖于 genericServer。

注册 APIGroup

vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
...
apiResourceConfig := c.GenericConfig.MergedResourceConfig
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
storage := map[string]rest.Storage{}
// customresourcedefinitions 也就是 crd
if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[resource] = customResourceDefinitionStorage
storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
}
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}

if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
...
}

这里有两个需要注意的:storageInstallAPIGroup

storage

rest.Storage 是一个接口,可以理解为一个跟 etcd 交互的接口。

vendor/k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// rest implements a RESTStorage for API services against etcd
type REST struct {
*genericregistry.Store
}

// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
strategy := NewStrategy(scheme)

store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },
NewListFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },
PredicateFunc: MatchCustomResourceDefinition,
DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),
SingularQualifiedResource: apiextensions.Resource("customresourcedefinition"),

CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
ResetFieldsStrategy: strategy,

// TODO: define table converter that exposes more than name/creation timestamp
TableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return &REST{store}, nil
}

vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 存储实现了 k8s.io/apiserver/pkg/registry/rest.StandardStorage。
// 它旨在可嵌入,并允许消费者实现任何所需的非通用函数。此对象旨在是可复制的,以便可以以不同方式使用但共享相同的基础行为。除非另有说明,否则所有字段都是必需的。
// 此类型预期用于嵌入到特定 Kind RESTStorage 实现中。该类型提供了对 Kubelike 资源的 CRUD 语义,处理冲突检测和语义等细节与 ResourceVersion 相关。
// RESTCreateStrategy、RESTUpdateStrategy 和 RESTDeleteStrategy 在所有后端上都是通用的,并封装了特定于 API 的逻辑。待办事项:使默认公开方法完全匹配通用 RESTStorage
type Store struct {
// NewFunc returns a new instance of the type this registry returns for a
// GET of a single object, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
NewFunc func() runtime.Object

// NewListFunc returns a new list of the type this registry; it is the
// type returned when the resource is listed, e.g.:
//
// curl GET /apis/group/version/namespaces/my-ns/myresource
NewListFunc func() runtime.Object
...
}

注册了 CRD 对应的 Store 后,即可实现其对应的 CURD 方法,例如查询某个 CRD 对象对应的 api 是:

1
2
// GET /apis/group/version/namespaces/my-ns/myresource/name-of-object
GET /apis/apiextensions.k8s.io/v1beta1/namespaces/${namespace}/crd/${crd-name}

Store 可以实现的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
New() Object
Destroy()
NewList() Object
NamespaceScoped() bool
GetCreateStrategy() RESTCreateStrategy
GetUpdateStrategy() RESTUpdateStrategy
GetDeleteStrategy() RESTDeleteStrategy
List(ctx context.Context, options *ListOptions) (Object, error)
ListPredicate(ctx context.Context, p SelectionPredicate, options *ListOptions) (Object, error)
Create(ctx context.Context, obj Object, createValidation ValidateObjectFunc, options *CreateOptions) (Object, error)
Update(ctx context.Context, name string, objInfo UpdatedObjectInfo, createValidation ValidateObjectFunc, updateValidation ValidateObjectUpdateFunc, forceAllowCreate bool, options *UpdateOptions) (Object, bool, error)
Get(ctx context.Context, name string, options *GetOptions) (Object, error)
Delete(ctx context.Context, name string, deleteValidation ValidateObjectFunc, options *DeleteOptions) (Object, bool, error)
DeleteReturnsDeletedObject() bool
DeleteCollection(ctx context.Context, deleteValidation ValidateObjectFunc, options *DeleteOptions, listOptions *ListOptions) (Object, error)
Watch(ctx context.Context, options *ListOptions) (Interface, error)
WatchPredicate(ctx context.Context, p SelectionPredicate, resourceVersion string, sendInitialEvents *bool) (Interface, error)
CompleteWithOptions(options *StoreOptions) error
ConvertToTable(ctx context.Context, object Object, tableOptions Object) (*Table, error)
StorageVersion() GroupVersioner
GetResetFields() map[APIVersion]*Set
GetSingularName() string

InstallAPIGroup

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// InstallAPIGroup exposes the given api group in the API.
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
// underlying storage will be destroyed on this servers shutdown.
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
return s.InstallAPIGroups(apiGroupInfo)
}

func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
...
// 注册APIGroup下的所有resource并注册
for _, apiGroupInfo := range apiGroupInfos {
if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
return fmt.Errorf("unable to install api resources: %v", err)
}

// setup discovery
// Install the version handler.
// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
// Check the config to make sure that we elide versions that don't have any resources
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
continue
}
apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
GroupVersion: groupVersion.String(),
Version: groupVersion.Version,
})
}
preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
Version: apiGroupInfo.PrioritizedVersions[0].Version,
}
apiGroup := metav1.APIGroup{
Name: apiGroupInfo.PrioritizedVersions[0].Group,
Versions: apiVersionsForDiscovery,
PreferredVersion: preferredVersionForDiscovery,
}

s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
}
...
}

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeConverter managedfields.TypeConverter) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
...
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
...
}

vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}

apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
if err != nil {
registrationErrors = append(registrationErrors, err)
}
return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}

vendor/k8s.io/apiserver/pkg/endpoints/installer.go 中定义的

1
2
3
4
5
type APIInstaller struct {
group *APIGroupVersion
prefix string // Path prefix where API resources are to be registered.
minRequestTimeout time.Duration
}

prefix 代表的就是路径。

vendor/k8s.io/apiserver/pkg/endpoints/installer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
...
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)
...

// 2、为 resource 添加对应的 actions 并根据是否支持 namespace
// Handler for standard REST verbs (GET, PUT, POST and DELETE).
// Add actions at the resource path: /api/apiVersion/resource
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
// DEPRECATED in 1.11
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

// Add actions at the item path: /api/apiVersion/resource/{name}
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
...
// 根据 action 创建对应的 route
kubeVerbs := map[string]struct{}{}
reqScope := handlers.RequestScope{
Serializer: a.group.Serializer,
ParameterCodec: a.group.ParameterCodec,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
...

// 从 rest.Storage 到 restful.Route 映射
// 为每个操作添加对应的 handler
for _, action := range actions {
producedObject := storageMeta.ProducesObject(action.Verb)
if producedObject == nil {
producedObject = defaultVersionedObject
}
reqScope.Namer = action.Namer

requestScope := "cluster"
var namespaced string
var operationSuffix string
if apiResource.Namespaced {
requestScope = "namespace"
namespaced = "Namespaced"
}
if strings.HasSuffix(action.Path, "/{path:*}") {
requestScope = "resource"
operationSuffix = operationSuffix + "WithPath"
}
if strings.Contains(action.Path, "/{name}") || action.Verb == "POST" {
requestScope = "resource"
}
if action.AllNamespaces {
requestScope = "cluster"
operationSuffix = operationSuffix + "ForAllNamespaces"
namespaced = ""
}
...
// 为每一种操作,绑定 route 到 handler
switch action.Verb {
case "GET": // Get a resource.
case "LIST": // List all resources of a kind.
case "PUT": // Update a resource.
...
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
Group: reqScope.Kind.Group,
Version: reqScope.Kind.Version,
Kind: reqScope.Kind.Kind,
})
route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
// 最终使用 go-restful 生成 WebService
ws.Route(route)
}
...
}

registerResourceHandlers 实现了 rest.Storagerestful.Route 的转换,其首先会判断 API Resource 所支持的 REST 接口,然后为 REST 接口添加对应的 handler,最后将其注册到路由中。

例如 POST

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
// 可观测行
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
// 链式调用,AOP 思想
handler = utilwarning.AddWarningsHandler(handler, warnings)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed. Defaults to 'false' unless the user-agent indicates a browser or command-line HTTP tool (curl and wget).")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)

Master APIServer

创建 master apiserver 也是一样的逻辑

创建 kubeAPIServer

cmd/kube-apiserver/app/server.go

1
2
3
4
5
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
...
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
...
}

创建的时,Install 所有的 API

pkg/controlplane/instance.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
...
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...

// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
...
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}
...

if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
// DefaultLegacyAPIPrefix 也就是 /api 前缀
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// 将其他的注入到 /apis 前缀里面
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}

// 注册 /apis
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}

接下来又回到了 vendor/k8s.io/apiserver/pkg/server/genericapiserver.go ,跟 APIExtensionsServer 一致。

APIServer 启动分为以下几步:

1、创建 GenericServer

2、实例化 master APIServer(Kube APIServer)

3、注册 /api 下的资源(LegacyAPI)

4、注册 /apis下的资源(nonLegacy)

与启动 APIExtensionsServer 的流程差不多。

至此,CreateServerChain 中流程已经分析完,其中的调用链如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
                    |--> CreateNodeDialer
|
|--> CreateKubeAPIServerConfig
|
CreateServerChain --|--> createAPIExtensionsConfig
|
| |--> c.GenericConfig.New
|--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
| |--> s.GenericAPIServer.InstallAPIGroup
|
| |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
| |
|--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
| |
| |--> m.InstallAPIs
|
|
|--> createAggregatorConfig
|
| |--> c.GenericConfig.New
| |
|--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
|
|--> s.GenericAPIServer.InstallAPIGroup

启动 Web 服务

所有的配置、resource、chain handler 注册好了之后,启动 Web 服务

cmd/kube-apiserver/app/server.go

1
2
3
4
5
6
7
8
9
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
...
prepared, err := server.PrepareRun()
if err != nil {
return err
}

return prepared.Run(stopCh)
}

vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

1
2
3
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}

其实是 prepared 的 Run 方法:

vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

1
2
3
4
5
6
7
8
9
10
func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
...
prepared := s.GenericAPIServer.PrepareRun()
...
return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
}

func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}

编制出 server 生命周期中的状态流转(利用 channel 机制);关闭开始前调用 PreShutdownHooks

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// This is the diagram of what channels/signals are dependent on each other:
//
// | stopCh
// | |
// | ---------------------------------------------------------
// | | |
// | ShutdownInitiated (shutdownInitiatedCh) |
// | | |
// | (ShutdownDelayDuration) (PreShutdownHooks)
// | | |
// | AfterShutdownDelayDuration (delayedStopCh) PreShutdownHooksStopped (preShutdownHooksHasStoppedCh)
// | | |
// | |-------------------------------------------------------|
// | |
// | |
// | NotAcceptingNewRequest (notAcceptingNewRequestCh)
// | |
// | |
// | |----------------------------------------------------------------------------------|
// | | | | |
// | [without [with | |
// | ShutdownSendRetryAfter] ShutdownSendRetryAfter] | |
// | | | | |
// | | ---------------| |
// | | | |
// | | |----------------|-----------------------| |
// | | | | |
// | | (NonLongRunningRequestWaitGroup::Wait) (WatchRequestWaitGroup::Wait) |
// | | | | |
// | | |------------------|---------------------| |
// | | | |
// | | InFlightRequestsDrained (drainedCh) |
// | | | |
// | |-------------------|---------------------|----------------------------------------|
// | | |
// | stopHttpServerCh (AuditBackend::Shutdown())
// | |
// | listenerStoppedCh
// | |
// | HTTPServerStoppedListening (httpServerStoppedListeningCh)
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
...
// Start the audit backend before any request comes in. This means we must call Backend.Run
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
// AuditBackend.Run will stop as soon as all in-flight requests are drained.
// 审计相关
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}

// 非阻塞启动服务
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
...
}

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
...
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
return nil, nil, err
}
}
...
}

HTTP 2.0 设置 TLS 设置,启动 Server

vendor/k8s.io/apiserver/pkg/server/secure_serving.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
...
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,

IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
}
...
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}

最终是通过 HTTP Server 启动服务。

References

图解kubernetes中的api多版本中反序列化与转换

Kubernetes源码学习-APIServer-P1-基础结构信息

kube-apiserver 的设计与实现