cmd := &cobra.Command{ Use: "kube-apiserver", Long: `The Kubernetes API server validates and configures data for the api objects which include pods, services, replicationcontrollers, and others. The API Server services REST operations and provides the frontend to the cluster's shared state through which all other components interact.`,
// stop printing usage when the command errors SilenceUsage: true, PersistentPreRunE: func(*cobra.Command, []string)error { // silence client-go warnings. // kube-apiserver loopback clients should not log self-issued warnings. rest.SetDefaultWarningHandler(rest.NoWarnings{}) returnnil }, // 核心逻辑,RunE 相比 Run 会返回 Error RunE: func(cmd *cobra.Command, args []string)error { verflag.PrintAndExitIfRequested() fs := cmd.Flags()
// Activate logging as soon as possible, after that // show flags with the final logging configuration. if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil { return err } cliflag.PrintFlags(fs)
// set default options // 补全默认值 completedOptions, err := s.Complete() if err != nil { return err }
// validate options // 校验配置是否合法或者是否有冲突 if errs := completedOptions.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } // add feature enablement metrics utilfeature.DefaultMutableFeatureGate.AddMetrics() // 运行 Run 方法 // 也是通过 chan 做服务停止的信号处理 return Run(completedOptions, genericapiserver.SetupSignalHandler()) }, Args: func(cmd *cobra.Command, args []string)error { for _, arg := range args { iflen(arg) > 0 { return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) } } returnnil }, }
// 将命令行指定的配置和 fs 关联起来 fs := cmd.Flags() namedFlagSets := s.Flags() verflag.AddFlags(namedFlagSets.FlagSet("global")) globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags()) options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic")) for _, f := range namedFlagSets.FlagSets { fs.AddFlagSet(f) }
// Run runs the specified APIServer. This should never exit. // 也就是执行逻辑,将命令行跑起来 funcRun(opts options.CompletedOptions, stopCh <-chanstruct{})error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get())
// CreateServerChain creates the apiservers connected via delegation. funcCreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) { // notFoundHandler ,前面所有的 chan 都没有命中,就是用 notFoundHandler notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) // ExtensionsServer:负责 CRD 的处理,也就是 Custom API Server apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler)) if err != nil { returnnil, err } crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
// kubeAPIServer 负责 build-in 的 API Object 相关的处理 kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer) if err != nil { returnnil, err }
// aggregator comes last in the chain // 负责转发请求到 API Server 或者 Custom API Server aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled) if err != nil { // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines returnnil, err }
return aggregatorServer, nil }
API Server 的 Instance 是通过 config.ControlPlane.New(apiExtensionsServer.GenericAPIServer) 实现的
func(c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) { ... // 这个就是返回的 Instance m := &Instance{ GenericAPIServer: s, ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo, } ... // 注册核心的方法,用于响应 REST 请求,用于处理 元老级资源请求 legacyRESTStorageProvider, err := corerest.New(corerest.Config{ GenericConfig: corerest.GenericConfig{ StorageFactory: c.ExtraConfig.StorageFactory, EventTTL: c.ExtraConfig.EventTTL, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, ExtendExpiration: c.ExtraConfig.ExtendExpiration, ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, APIAudiences: c.GenericConfig.Authentication.APIAudiences, Informers: c.ExtraConfig.VersionedInformers, }, Proxy: corerest.ProxyConfig{ Transport: c.ExtraConfig.ProxyTransport, KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, }, Services: corerest.ServicesConfig{ ClusterIPRange: c.ExtraConfig.ServiceIPRange, SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange, NodePortRange: c.ExtraConfig.ServiceNodePortRange, IPRepairInterval: c.ExtraConfig.RepairServicesInterval, }, }) if err != nil { returnnil, err } ... // The order here is preserved in discovery. // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"), // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer. // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go // with specific priorities. // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery // handlers that we have. // 所有的 Provider,用于响应 REST 请求,元老级资源和非元老级资源 // 如果具有相同名称的资源存在于这些组中的多个组中(例如“deployments.apps”和“deployments.extensions”),则此列表的顺序决定未经限定的资源名称(例如“deployments”)应优先选择哪个组。 // 此优先级顺序用于本地发现,但最终会聚合到`k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go` 中,并具有特定优先级。 restStorageProviders := []RESTStorageProvider{ legacyRESTStorageProvider, apiserverinternalrest.StorageProvider{}, authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences}, authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver}, autoscalingrest.RESTStorageProvider{}, batchrest.RESTStorageProvider{}, certificatesrest.RESTStorageProvider{}, coordinationrest.RESTStorageProvider{}, discoveryrest.StorageProvider{}, networkingrest.RESTStorageProvider{}, noderest.RESTStorageProvider{}, policyrest.RESTStorageProvider{}, rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer}, schedulingrest.RESTStorageProvider{}, storagerest.RESTStorageProvider{}, flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory}, // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names. // See https://github.com/kubernetes/kubernetes/issues/42392 appsrest.StorageProvider{}, admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration}, eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL}, resourcerest.RESTStorageProvider{}, } if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { returnnil, err } ... }
由于 API 的划分不是根据资源类型,而是作为参数,因此需要将对应 Object 注册进来,然后针对性处理。
for _, restStorageBuilder := range restStorageProviders { groupName := restStorageBuilder.GroupName() // 生成 apiGroupInfo,包含所有元老资源信息 apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter) if err != nil { return fmt.Errorf("problem initializing API group %q : %v", groupName, err) } ... iflen(groupName) == 0 { // the legacy group for core APIs is special that it is installed into /api via this special install method. // 将元老资源都 install 到 APIGroup 里面 if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering legacy API: %w", err) } } else { // everything else goes to /apis nonLegacy = append(nonLegacy, &apiGroupInfo) }
// 将 非元老资源 install 进来 if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } returnnil ... }
import ( // These imports are the API groups the API server will support. _ "k8s.io/kubernetes/pkg/apis/admission/install" _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install" _ "k8s.io/kubernetes/pkg/apis/apiserverinternal/install" _ "k8s.io/kubernetes/pkg/apis/apps/install" _ "k8s.io/kubernetes/pkg/apis/authentication/install" _ "k8s.io/kubernetes/pkg/apis/authorization/install" _ "k8s.io/kubernetes/pkg/apis/autoscaling/install" _ "k8s.io/kubernetes/pkg/apis/batch/install" _ "k8s.io/kubernetes/pkg/apis/certificates/install" _ "k8s.io/kubernetes/pkg/apis/coordination/install" _ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/discovery/install" _ "k8s.io/kubernetes/pkg/apis/events/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install" _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install" _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install" _ "k8s.io/kubernetes/pkg/apis/networking/install" _ "k8s.io/kubernetes/pkg/apis/node/install" _ "k8s.io/kubernetes/pkg/apis/policy/install" _ "k8s.io/kubernetes/pkg/apis/rbac/install" _ "k8s.io/kubernetes/pkg/apis/resource/install" _ "k8s.io/kubernetes/pkg/apis/scheduling/install" _ "k8s.io/kubernetes/pkg/apis/storage/install" )
例如
1 2 3 4 5 6 7 8 9 10 11 12
funcinit() { Install(legacyscheme.Scheme) }
// Install registers the API group and adds types to a scheme funcInstall(scheme *runtime.Scheme) { // 依次将group、version注册到了scheme中,并设置各version的优先级 utilruntime.Must(events.AddToScheme(scheme)) utilruntime.Must(v1beta1.AddToScheme(scheme)) utilruntime.Must(v1.AddToScheme(scheme)) utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)) }
将 scheme 导入到 API Server 中
pkg/api/legacyscheme/scheme.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14
var ( // Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered. // NOTE: If you are copying this file to start a new api group, STOP! Copy the // extensions group instead. This Scheme is special and should appear ONLY in // the api group, unless you really know what you're doing. // TODO(lavalamp): make the above error impossible. Scheme = runtime.NewScheme()
// Codecs provides access to encoding and decoding for the scheme Codecs = serializer.NewCodecFactory(Scheme)
// ParameterCodec handles versioning of objects that are converted to query parameters. ParameterCodec = runtime.NewParameterCodec(Scheme) )
Scheme
Scheme 注册表,定义了序列化和反序列化 API 对象的方法,用于将组、版本和类型信息转换为 Go 模式,并在不同版本之间进行映射。
方案是随着时间推移而建立的分级 API 和配置的基础。
在一个 Scheme 中,Type 是特定的 Go 结构体,Version 是表示该 Type 特定表现形式的时点标识符(通常向后兼容),Kind 是该 Version 内 Type 的唯一名称,Group 标识一组随时间演变的 Versions、Kinds 和 Types。
Unversioned Type 指尚未正式绑定到类型并承诺向后兼容(实际上是不希望未来发生更改的 v1 类型)。
预计方案在运行时不会更改,并且只有在注册完成后才能保证线程安全。
总的来说,Schema 是一个结构体,内含处理外部 Version 之间转换,GVK 和 Go Type 之间转换之用的数据和方法。
// Scheme defines methods for serializing and deserializing API objects, a type // registry for converting group, version, and kind information to and from Go // schemas, and mappings between Go schemas of different versions. A scheme is the // foundation for a versioned API and versioned configuration over time. // // In a Scheme, a Type is a particular Go struct, a Version is a point-in-time // identifier for a particular representation of that Type (typically backwards // compatible), a Kind is the unique name for that Type within the Version, and a // Group identifies a set of Versions, Kinds, and Types that evolve over time. An // Unversioned Type is one that is not yet formally bound to a type and is promised // to be backwards compatible (effectively a "v1" of a Type that does not expect // to break in the future). // // Schemes are not expected to change at runtime and are only threadsafe after // registration is complete. type Scheme struct { // gvkToType allows one to figure out the go type of an object with // the given version and name. // gvk 和 type 两者相互转换,用map结构存储gvk和Type的映射关系,一个gvk只会具体对应一个Type gvkToType map[schema.GroupVersionKind]reflect.Type // 几乎所有的资源都是携带版本的,是常用的类型,用map结构存储type和gvk的映射关系,不同的是,一个type可能会对应多个gvk // typeToGVK allows one to find metadata for a given go object. // The reflect.Type we index by should *not* be a pointer. typeToGVK map[reflect.Type][]schema.GroupVersionKind
// 无版本资源。这个在现版本的k8s中使用非常少,可以忽略 // unversionedTypes are transformed without conversion in ConvertToVersion. unversionedTypes map[reflect.Type]schema.GroupVersionKind
// unversionedKinds are the names of kinds that can be created in the context of any group // or version // TODO: resolve the status of unversioned types. unversionedKinds map[string]reflect.Type
// Map from version and resource to the corresponding func to convert // resource field labels in that version to internal version. fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc
// defaulterFuncs is a map to funcs to be called with an object to provide defaulting // the provided object must be a pointer. // 设置默认值 defaulterFuncs map[reflect.Type]func(interface{})
// converter stores all registered conversion functions. It also has // default converting behavior. // map结构,存放资源版本转换的方法 converter *conversion.Converter
// versionPriority is a map of groups to ordered lists of versions for those groups indicating the // default priorities of these versions as registered in the scheme versionPriority map[string][]string
// observedVersions keeps track of the order we've seen versions during type registration observedVersions []schema.GroupVersion
// schemeName is the name of this scheme. If you don't specify a name, the stack of the NewScheme caller will be used. // This is useful for error reporting to indicate the origin of the scheme. schemeName string }
1 2 3 4 5 6 7
// GroupVersionKind unambiguously identifies a kind. It doesn't anonymously include GroupVersion // to avoid automatic coercion. It doesn't use a GroupVersion to avoid custom marshalling type GroupVersionKind struct { Group string Version string Kind string }
var ( // TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api. // localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes. SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) localSchemeBuilder = &SchemeBuilder AddToScheme = localSchemeBuilder.AddToScheme )
// Adds the list of known types to the given scheme. funcaddKnownTypes(scheme *runtime.Scheme)error { scheme.AddKnownTypes(SchemeGroupVersion, &Deployment{}, &DeploymentList{}, &StatefulSet{}, &StatefulSetList{}, &DaemonSet{}, &DaemonSetList{}, &ReplicaSet{}, &ReplicaSetList{}, &ControllerRevision{}, &ControllerRevisionList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) returnnil }
注入的内容就是 kubernetes 里面的 Object
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Deployment enables declarative updates for Pods and ReplicaSets. type Deployment struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Specification of the desired behavior of the Deployment. // +optional Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Most recently observed status of the Deployment. // +optional Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }
// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are // expected to be serialized to the wire, the interface an Object must provide to the Scheme allows // serializers to set the kind, version, and group the object is represented as. An Object may choose // to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized. type Object interface { GetObjectKind() schema.ObjectKind DeepCopyObject() Object }
// All objects that are serialized from a Scheme encode their type information. This interface is used // by serialization to set type information from the Scheme onto the serialized version of an object. // For objects that cannot be serialized or have unique requirements, this interface may be a no-op. type ObjectKind interface { // SetGroupVersionKind sets or clears the intended serialized kind of an object. Passing kind nil // should clear the current setting. SetGroupVersionKind(kind GroupVersionKind) // GroupVersionKind returns the stored group, version, and kind of an object, or an empty struct // if the object does not expose or provide these fields. GroupVersionKind() GroupVersionKind }
// CodecFactory provides methods for retrieving codecs and serializers for specific // versions and content types. type CodecFactory struct { scheme *runtime.Scheme universal runtime.Decoder accepts []runtime.SerializerInfo
// yamlSerializer converts YAML passed to the Decoder methods to JSON. type yamlSerializer struct { // the nested serializer runtime.Serializer }
// yamlSerializer implements Serializer var _ runtime.Serializer = yamlSerializer{}
// NewDecodingSerializer adds YAML decoding support to a serializer that supports JSON. funcNewDecodingSerializer(jsonSerializer runtime.Serializer) runtime.Serializer { return &yamlSerializer{jsonSerializer} }
func(c yamlSerializer) Decode(data []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { out, err := yaml.ToJSON(data) if err != nil { returnnil, nil, err } data = out return c.Serializer.Decode(data, gvk, into) }
// APIResource specifies the name of a resource and whether it is namespaced. type APIResource struct { ... // verbs is a list of supported kube verbs (this includes get, list, watch, create, // update, patch, delete, deletecollection, and proxy) Verbs Verbs `json:"verbs" protobuf:"bytes,4,opt,name=verbs"` ... }
// Verbs masks the value so protobuf can generate // // +protobuf.nullable=true // +protobuf.options.(gogoproto.goproto_stringer)=false type Verbs []string
type StatefulSet struct { metav1.TypeMeta // +optional metav1.ObjectMeta
// Spec defines the desired identities of pods in this set. // +optional Spec StatefulSetSpec
// Status is the current status of Pods in this StatefulSet. This data // may be out of date by some window of time. // +optional Status StatefulSetStatus }
// Install registers the API group and adds types to a scheme funcInstall(scheme *runtime.Scheme) { utilruntime.Must(apps.AddToScheme(scheme)) utilruntime.Must(v1beta1.AddToScheme(scheme)) utilruntime.Must(v1beta2.AddToScheme(scheme)) utilruntime.Must(v1.AddToScheme(scheme)) utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion)) }
pkg/apis/apps/register.go
1 2 3 4 5 6
var ( // SchemeBuilder stores functions to add things to a scheme. SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) // AddToScheme applies all stored functions t oa scheme. AddToScheme = SchemeBuilder.AddToScheme )
// StatefulSet represents a set of pods with consistent identities. // Identities are defined as: // - Network: A single stable DNS and hostname. // - Storage: As many VolumeClaims as requested. // // The StatefulSet guarantees that a given network identity will always // map to the same storage identity. type StatefulSet struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata // +optional metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Spec defines the desired identities of pods in this set. // +optional Spec StatefulSetSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
// Status is the current status of Pods in this StatefulSet. This data // may be out of date by some window of time. // +optional Status StatefulSetStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"` }
例如 V1,与 Internal Version 共用 install.go
pkg/apis/apps/v1/register.go
1 2 3 4 5 6
funcinit() { // We only register manually written functions here. The registration of the // generated functions takes place in the generated files. The separation // makes the code compile even when the generated files are missing. localSchemeBuilder.Register(addDefaultingFuncs) }
// SetDefaults_Deployment sets additional defaults compared to its counterpart // in extensions. These addons are: // - MaxUnavailable during rolling update set to 25% (1 in extensions) // - MaxSurge value during rolling update set to 25% (1 in extensions) // - RevisionHistoryLimit set to 10 (not set in extensions) // - ProgressDeadlineSeconds set to 600s (not set in extensions) funcSetDefaults_Deployment(obj *appsv1.Deployment) { // Set DeploymentSpec.Replicas to 1 if it is not set. if obj.Spec.Replicas == nil { obj.Spec.Replicas = new(int32) *obj.Spec.Replicas = 1 } strategy := &obj.Spec.Strategy // Set default DeploymentStrategyType as RollingUpdate. if strategy.Type == "" { strategy.Type = appsv1.RollingUpdateDeploymentStrategyType } if strategy.Type == appsv1.RollingUpdateDeploymentStrategyType { if strategy.RollingUpdate == nil { rollingUpdate := appsv1.RollingUpdateDeployment{} strategy.RollingUpdate = &rollingUpdate } if strategy.RollingUpdate.MaxUnavailable == nil { // Set default MaxUnavailable as 25% by default. maxUnavailable := intstr.FromString("25%") strategy.RollingUpdate.MaxUnavailable = &maxUnavailable } if strategy.RollingUpdate.MaxSurge == nil { // Set default MaxSurge as 25% by default. maxSurge := intstr.FromString("25%") strategy.RollingUpdate.MaxSurge = &maxSurge } } if obj.Spec.RevisionHistoryLimit == nil { obj.Spec.RevisionHistoryLimit = new(int32) *obj.Spec.RevisionHistoryLimit = 10 } if obj.Spec.ProgressDeadlineSeconds == nil { obj.Spec.ProgressDeadlineSeconds = new(int32) *obj.Spec.ProgressDeadlineSeconds = 600 } }
Generic Server
提供暴露 http 服务所需基础设施。将操作 API Object 的 restful 服务暴露出去。
// GenericAPIServer contains state for a Kubernetes cluster api server. type GenericAPIServer struct {
... // "Outputs" // Handler holds the handlers being used by this API server // http handler 句柄 Handler *APIServerHandler
// 一些钩子函数 // PostStartHooks are each called after the server has started listening, in a separate go func for each // with no guarantee of ordering between them. The map key is a name used for error reporting. // It may kill the process with a panic if it wishes to by returning an error. postStartHookLock sync.Mutex postStartHooks map[string]postStartHookEntry postStartHooksCalled bool disabledPostStartHooks sets.String
// ServeHTTP makes it an http.Handler func(a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.FullHandlerChain.ServeHTTP(w, r) }
funcBuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler { // WithStorageVersionPrecondition needs the WithRequestInfo to run first handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager, c.Serializer) return DefaultBuildHandlerChain(handler, c) }
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the // context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client. handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
// GetDefinitionName returns the name and tags for a given definition func(d *DefinitionNamer) GetDefinitionName(name string) (string, spec.Extensions) { if groupVersionKinds, ok := d.typeGroupVersionKinds[name]; ok { return friendlyName(name), spec.Extensions{ extensionGVK: groupVersionKinds.JSON(), } } return friendlyName(name), nil }
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates. func(s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { s.delegationTarget.PrepareRun()
// rest implements a RESTStorage for API services against etcd type REST struct { *genericregistry.Store }
// NewREST returns a RESTStorage object that will work against API services. funcNewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) { strategy := NewStrategy(scheme)
// 存储实现了 k8s.io/apiserver/pkg/registry/rest.StandardStorage。 // 它旨在可嵌入,并允许消费者实现任何所需的非通用函数。此对象旨在是可复制的,以便可以以不同方式使用但共享相同的基础行为。除非另有说明,否则所有字段都是必需的。 // 此类型预期用于嵌入到特定 Kind RESTStorage 实现中。该类型提供了对 Kubelike 资源的 CRUD 语义,处理冲突检测和语义等细节与 ResourceVersion 相关。 // RESTCreateStrategy、RESTUpdateStrategy 和 RESTDeleteStrategy 在所有后端上都是通用的,并封装了特定于 API 的逻辑。待办事项:使默认公开方法完全匹配通用 RESTStorage type Store struct { // NewFunc returns a new instance of the type this registry returns for a // GET of a single object, e.g.: // // curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object NewFunc func() runtime.Object
// NewListFunc returns a new list of the type this registry; it is the // type returned when the resource is listed, e.g.: // // curl GET /apis/group/version/namespaces/my-ns/myresource NewListFunc func() runtime.Object ... }
注册了 CRD 对应的 Store 后,即可实现其对应的 CURD 方法,例如查询某个 CRD 对象对应的 api 是:
1 2
// GET /apis/group/version/namespaces/my-ns/myresource/name-of-object GET /apis/apiextensions.k8s.io/v1beta1/namespaces/${namespace}/crd/${crd-name}
// InstallAPIGroup exposes the given api group in the API. // The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the // underlying storage will be destroyed on this servers shutdown. func(s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error { return s.InstallAPIGroups(apiGroupInfo) }
func(s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error { ... // 注册APIGroup下的所有resource并注册 for _, apiGroupInfo := range apiGroupInfos { if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil { return fmt.Errorf("unable to install api resources: %v", err) }
// setup discovery // Install the version handler. // Add a handler at /apis/<groupName> to enumerate all versions supported by this group. apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{} for _, groupVersion := range apiGroupInfo.PrioritizedVersions { // Check the config to make sure that we elide versions that don't have any resources iflen(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { continue } apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{ GroupVersion: groupVersion.String(), Version: groupVersion.Version, }) } preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{ GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(), Version: apiGroupInfo.PrioritizedVersions[0].Version, } apiGroup := metav1.APIGroup{ Name: apiGroupInfo.PrioritizedVersions[0].Group, Versions: apiVersionsForDiscovery, PreferredVersion: preferredVersionForDiscovery, }
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. func(g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, prefix: prefix, minRequestTimeout: g.MinRequestTimeout, }
type APIInstaller struct { group *APIGroupVersion prefix string// Path prefix where API resources are to be registered. minRequestTimeout time.Duration }
func(c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) { ... if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { returnnil, err } ... // InstallAPIs will install the APIs for the restStorageProviders if they are enabled. func(m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error { ... for _, restStorageBuilder := range restStorageProviders { groupName := restStorageBuilder.GroupName() apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter) if err != nil { return fmt.Errorf("problem initializing API group %q : %v", groupName, err) } ...
iflen(groupName) == 0 { // the legacy group for core APIs is special that it is installed into /api via this special install method. // DefaultLegacyAPIPrefix 也就是 /api 前缀 if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering legacy API: %w", err) } } else { // 将其他的注入到 /apis 前缀里面 // everything else goes to /apis nonLegacy = append(nonLegacy, &apiGroupInfo) } } // 注册 /apis if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } returnnil }