Kubernetes API Server handler 注册过程分析

网友投稿 291 2023-05-19

作者|韩伟森,就职于中国移动云能力中心,专注于云原生领域。前言

K8s提供 Aggregated APIServer​ 的扩展方式,编写 Aggregated APIServer​ 本质上和K8s构建方式类似,理解 APiServer 资源的加载方式,能更好好的理解如何开发Aggregated APIServer。本文以内置资源的 handler 注册过程为线索介绍了 APiServer 的启动过程和 handler 注册过程。使用k8s代码commit id为c6970e64528ba78b74bf77b86f9b78b7b61bd0cd

APIServer启动过程介绍

图1 APIServer启动流程

图1给出了 ApiServer 的初始化流程,首先通过 CreateServerChain 构造出3个APIServer:

AggregatorServer:拦截Aggregated APIServer​ 中定义的资源对象请求,并转发给相关的Aggregated APIServer 处理。KubeAPIServer:用于处理 k8s 的内建资源,如:Deployment,ConfigMap 等。APIExtensionServer:负责处理用户自定义资源。

它们之间的处理顺序为如下图所示,当用户请求进来,先判断 AggregatorServer​ 能否处理,否则代理给 kubeApiServer​ ,如果 kubeApiServer​ 不能处代理给 ApiExtensionServer 处理,如果都不能处理则交给 notFoundHandler 处理。

图2 三种 APIServer 请求顺序

限于篇幅原因,本文主要分析 kubeapiserver 的启动过程。

CreateApiServerConfig​ 通过调用 buildGenericConfig​ 构建 genericapiserver.Config。genericapiserver.Config​ 中包含了启动Genericapiserver​ 所需要的配置信息,比如:RequestTimeout​ 定义了请求的超时时间,AdmissionControl​ 对象进行准入控制。buildGenericConfig​ 中需要注意的是 BuildHandlerChainFunc​,请求在路由给资源对象的handler前先经过的BuildHandlerChainFunc​ 中定义的 Filter​ 。参考图1,通过深入 buildGenericConfig​ 可以发现 BuildHandlerChainFunc​ 传入的是 DefaultBuildHandlerChain​ ,其中 Filter 先定义的后调用。

复制// k8s.io/apiserver/pkg/server/config.go func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := filterlatency.TrackCompleted(apiHandler) // 构造权限检查filter handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) ... // 构造认证filter handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences) ... // 构造请求超时filter, LongRunningFunc会判断该请求是否是需要LongRunning的,比如watch的请求,如果是,该filter不会对这类请求生效 // WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the // context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client. handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc) handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc, c.Serializer, c.RequestTimeout) handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) ... // 初始化RequestInfo的filter并将其放入context中,后续的处理逻辑可以从context直接获取RequestInfo handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) .... return handler }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.

CreateKubeAPIServer​ 中调用了kubeAPIServerConfig.Complete().New​构造出了 kubeAPIServer​ 的 GenericServer。kubeAPIServerConfig.Complete().New​中通过调用 m.InstallLegacyAPI​ 初始化核心资源并添加进路由中,对应的是以 api 开头的资源,如:Pod,ConfigMap 等。调用 m.InstallAPI 初始化以 apis 开头的内置资源如:Deployment。

handler的注册过程

从图1可以看出 InstallAPI​ 与 InstallLegacyAPI​ 的创建过程基本类似,本文主要介绍 InstallAPI 的初始化过程。

在调用 InstallAPI​ 之前kubeAPIServerConfig.Complete().New​会先创建内置资源对象的RESTStorageProvider​ 作为 InstallAPI 的入参

复制//pkg/controlplane/instance.go func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) { ... // 构造内置资源的RESTStorageProvider restStorageProviders := []RESTStorageProvider{ apiserverinternalrest.StorageProvider{}, authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences}, authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver}, autoscalingrest.RESTStorageProvider{}, batchrest.RESTStorageProvider{}, certificatesrest.RESTStorageProvider{}, coordinationrest.RESTStorageProvider{}, discoveryrest.StorageProvider{}, networkingrest.RESTStorageProvider{}, noderest.RESTStorageProvider{}, policyrest.RESTStorageProvider{}, rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer}, schedulingrest.RESTStorageProvider{}, storagerest.RESTStorageProvider{}, flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory}, // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names. // See https://github.com/kubernetes/kubernetes/issues/42392 appsrest.StorageProvider{}, admissionregistrationrest.RESTStorageProvider{}, eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL}, } if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { return nil, err } ... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.

RESTStorageProvider​ 是一个接口,通过其 NewRESTStorage​ 构造出 APIGroupInfo​ ,APIGroupInfo​ 包含注册资源所需的基本信息比如编解码器,组下所有资源的 Storage 对象VersionedResourcesStorageMap。

复制//k8s.io/apiserver/pkg/server/genericapiserver.go // Info about an API group. type APIGroupInfo struct { PrioritizedVersions []schema.GroupVersion // Info about the resources in this group. Its a map from version to resource to the storage. VersionedResourcesStorageMap map[string]map[string]rest.Storage ... // NegotiatedSerializer controls how this group encodes and decodes data NegotiatedSerializer runtime.NegotiatedSerializer // ParameterCodec performs conversions for query parameters passed to API calls ParameterCodec runtime.ParameterCodec ... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.

VersionedResourcesStorageMap​ 需要重点注意,编写 Aggregated APIServer​主要逻辑是通过 NewDefaultAPIGroupInfo​ 初始化 APIGroupInfo​ 以后设置 VersionedResourcesStorageMap​ 属性。VersionedResourcesStorageMap​的签名是 map[string]map[string]rest.Storage​。第一个key是版本号,第二个key是资源名称,资源名称可以是 deployment 这种资源,同时也能是子资源如 pod/status​ , pod/log​ 等是pod的子资源有单独的storage。最终构建handler的请求路径是基于 VersionedResourcesStorageMap​ 中提供的版本号和资源名称确定的 。rest.Storage 用于处理具体的请求,其声明如下:

复制// k8s.io/apiserver/pkg/registry/rest/rest.go // Storage is a generic interface for RESTful storage services. // Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected // that objects may implement any of the below interfaces. type Storage interface { // New returns an empty object that can be used with Create and Update after request data has been put into it. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object // Destroy cleans up its resources on shutdown. // Destroy has to be implemented in thread-safe way and be prepared // for being called more than once. Destroy() }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

实现 rest.Storage​ 的接口最基本的,如果需要支持不同的请求,还需要实现其他的接口,相关定义在 k8s.io/apiserver/pkg/registry/rest/rest.go中,如:

复制// k8s.io/apiserver/pkg/registry/rest/rest.go // 资源对象支持POST请求,例入通过kubectl create一个资源对象。 // Creater is an object that can create an instance of a RESTful object. type Creater interface { // New returns an empty object that can be used with Create after request data has been put into it. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object // Create creates a new version of a resource. Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) } // 资源对象支持GET请求,例如通过kubectl get 一个资源对象。 // Getter is an object that can retrieve a named RESTful resource. type Getter interface { // Get finds a resource in the storage by name and returns it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) } // 支持对资源对象进行watch操作 例如通过kubectl get 资源对象 -w。 type Watcher interface { // label selects on labels; field selects on the objects fields. Not all fields // are supported; an error should be returned if field tries to select on a field that // isnt supported. resourceVersion allows for continuing/starting a watch at a // particular version. Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.

后续的处理中会依据 Creater​ ,Getter​ 和 Watcher​ 等接口生成对应请求的handler,后文会进行具体的分析。k8s的内置资源存储都使用 etcd,因此内置资源的 Storage 是通过 Store​ 构建。Store​ 定义在 /k8s.io/apiserver/pkg/registry/generic/registry/store.go​文件中,已经实现 Creater​ , Getter​, Watcher​等接口,其他的资源只需在初始化 Store 时传入一些必须的参数即可,无需编写存储层的交互代码。下面给出了构造 deployment 的 store 的过程,其他内置资源大同小异。

复制// NewREST returns a RESTStorage object that will work against deployments. func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) { // 创建一个deployments的genericregistry.Store store := &genericregistry.Store{ // 初始化一个空资源对象,这里使用的是internal的版本,下面定义的各种strategy操作的对象也是internal版本,这样就不用为每一种版本编写一个strategy策略 NewFunc: func() runtime.Object { return &apps.Deployment{} }, // 初始化一个空资源对象列表 NewListFunc: func() runtime.Object { return &apps.DeploymentList{} }, DefaultQualifiedResource: apps.Resource("deployments"), // 创建更新删除策略 主要是做校验及控制那些字段不能被用户覆盖用 CreateStrategy: deployment.Strategy, UpdateStrategy: deployment.Strategy, DeleteStrategy: deployment.Strategy, ResetFieldsStrategy: deployment.Strategy, TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } options := &generic.StoreOptions{RESTOptions: optsGetter} // 继续完成store其他属性的初始化,比如初始化store.Storage属性。Storage主要用于和底层存储层交互 if err := store.CompleteWithOptions(options); err != nil { return nil, nil, nil, err } statusStore := *store // deployment的status子资源也是使用store, 区别是更新策略不一样, 即在update时会用旧对象的spec和lable覆盖新对象的,防止非status字段被用户意外覆盖 statusStore.UpdateStrategy = deployment.StatusStrategy statusStore.ResetFieldsStrategy = deployment.StatusStrategy return &REST{store}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.

InstallAPIs​ 调用链条比较深。参考图1,最终会来到k8s.io/apiserver/pkg/endpoints/groupversion.go​的 InstallREST​ 方法。InstallREST​ 方法构造出 handler 的前缀,创建APIInstaller​,然后调用installer.Install()方法继续handler的注册

复制// k8s.io/apiserver/pkg/endpoints/groupversion.go func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) { // 从InstallAPI调用链下来这里的g.Root为/apis,这样就可以确定handler的前缀为/apis/{goup}/{version} prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, prefix: prefix, minRequestTimeout: g.MinRequestTimeout, } apiResources, resourceInfos, ws, registrationErrors := installer.Install() versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors) }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.

installer.Install()​ 方法会调用registerResourceHandlers​ 方法,真正开始创建和注册处理请求的 handler,需要说明的是a.group.Storage​ 是上文提到的VersionedResourcesStorageMap​ 传入版本号后获得的 map。读者可以自行参考图1的调用链进行分析。a.registerResourceHandlers​ 就是为每一种Storage注册handlers

复制// Install handlers for API resources. func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) { var apiResources []metav1.APIResource var resourceInfos []*storageversion.ResourceInfo var errors []error ws := a.newWebService() // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) var i int = 0 // a.goup.Storage的签名是 map[string]Storage, for循环的path是map的key,即资源名称 for path := range a.group.Storage { paths[i] = path i++ } sort.Strings(paths) for _, path := range paths { apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) ... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.

registerResourceHandlers​ 会依据rest.Storage实现的接口生成相关的action。最终根据action生成handler并注册到rest容器中。

复制// k8s.io/apiserver/pkg/endpoints/installer.go func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) { ... // 初始化rest容器,根目录是APIInstaller的prefix属性,从InstallAPI调用链下来值为/apis/{goup}/{version} ws := a.newWebService() ... // 进行类型转换判断当前的storage支持哪些类型的操作 creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions) gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata) storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider) // Get the list of actions for the given scope. switch { case !namespaceScoped: // 构造有无namespace资源的action // Handle non-namespace scoped resources like nodes. ... default: // 构造有namespace资源的action // 构造handler的注册路径 namespaceParamName := "namespaces" // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string") namespacedPath := namespaceParamName + "/{namespace}/" + resource namespaceParams := []*restful.Parameter{namespaceParam} //resourcePath的值为 /namespaces/{namespace}/{resource} resourcePath := namespacedPath resourceParams := namespaceParams // itemPath: /namespaces/{namespace}/{resource}/{name} // name是请求资源对象的名字 itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) proxyParams := append(nameParams, pathParam) itemPathSuffix := "" if isSubresource { itemPathSuffix = "/" + subresource // 有子资源等情况下 resourcePath被定义为:/namespaces/{namespace}/{resource}/{name}/{subResource} itemPath = itemPath + itemPathSuffix // itemPath与resourcePath的值一样 resourcePath = itemPath resourceParams = nameParams } apiResource.Name = path apiResource.Namespaced = true apiResource.Kind = resourceKind namer := handlers.ContextBasedNaming{ Namer: a.group.Namer, ClusterScoped: false, } // 根据storage实现的接口添加添加相关的action actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) // DEPRECATED in 1.11 actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) if getSubpath { actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter) } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) // DEPRECATED in 1.11 actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) // list or post across namespace. // For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods. // TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete) if !isSubresource { actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister) // DEPRECATED in 1.11 actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList) } } ... for _, action := range actions { ... switch action.Verb { case "GET": // Get a resource. var handler restful.RouteFunction // 构造get请求的handler // restfulGetResourceWithOptions和restfulGetResource将handlers.GetResource函数转换成restful.RouteFunction,即handler的函数签名 if isGetterWithOptions { handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource) } else { handler = restfulGetResource(getter, reqScope) } ... // 将handler注册到rest容器中 // action.Path是上面定义的itemPath或resourcePath,对于GET来说是itemPath // 当前注册的handler的路径是ws的根路径加上ation.Path. 完整的路径为:/apis/{goup}/{version}/namespaces/{namespace}/{resource}/{name} route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If true, then the output is pretty printed.")). Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). Writes(producedObject) if isGetterWithOptions { if err := AddObjectParams(ws, route, versionedGetOptions); err != nil { return nil, nil, err } } addParams(route, action.Params) routes = append(routes, route) } case "LIST": // List all resources of a kind. ... case "PUT": // Update a resource. ... case "PATCH": // Partially update a resource ... case "POST": // Create a resource. ... case "DELETE": // Delete a resource. .... } ... }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.

registerResourceHandlers​ 中创建的handler并不是直接调用Creater​ ,Updater​等接口定义的方法,而是在外面包了一层代码进行一些额外的处理,例如对象的编解码,admission control 的处理逻辑,针对 watch 这种长链接需要进行协议的处理等,相关的定义在k8s.io/apiserver/pkg/endpoints/handlers包下。文本以Get和Create例,分析请求的处理逻辑。

Get请求的处理过程比较简单,通过请求的查询串构造出metav1.GetOptions ,然后交给 Getter 接口处理,最后在将查询结果进行转换发回给请求者。

复制// k8s.io/apiserver/pkg/endpoints/handlers/get.go // GetResource returns a function that handles retrieving a single resource from a rest.Storage object. func GetResource(r rest.Getter, scope *RequestScope) http.HandlerFunc { return getResourceHandler(scope, func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) { // check for export options := metav1.GetOptions{} // 获取查询串 if values := req.URL.Query(); len(values) > 0 { ... // 将查询串解码成metav1.GetOptions if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil { err = errors.NewBadRequest(err.Error()) return nil, err } } if trace != nil { trace.Step("About to Get from storage") } // 交给Getter接口处理 return r.Get(ctx, name, &options) }) } // getResourceHandler is an HTTP handler function for get requests. It delegates to the // passed-in getterFunc to perform the actual get. func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { ... namespace, name, err := scope.Namer.Name(req) ... ctx := req.Context() ctx = request.WithNamespace(ctx, namespace) ... result, err := getter(ctx, name, req, trace) ... // 对处理结果进行转化为用户期望的格式并写入到response中返回给用户 transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result) } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.

Create的处理逻辑在 createHandler 中,代码较长,主要做以下几件事情:

对查询串进行解码生成 metav1.CreateOptions 。对请求的body体中的数据进行解码,生成资源对象。解码的对象版本是 internal 版本,internal 版本是该资源对象所有版本字段的全集。针对不同版本的对象内部可以使用相同的代码进行处理。 对对象进行修改的准入控制,判断是否修需要修改对象。交给creater接口创建资源对象。将数据转换为期望的格式写入 response 中,调用 creater 接口返回的结果仍然是 internal 版本,编码时,会编码成用户请求的版本返回给用户。
复制// k8s.io/apiserver/pkg/endpoints/handlers/create.go // CreateNamedResource returns a function that will handle a resource creation with name. func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc { return createHandler(r, scope, admission, true) } func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { ... // 从request中取出请求body body, err := limitedReadBody(req, scope.MaxRequestBodyBytes) ... // 对查询传进行解码生成metav1.CreateOptions options := &metav1.CreateOptions{} values := req.URL.Query() if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil { ... } // 将请求body解码成资源对象, defaultGVK是用户请求的版本,这里decoder解码出来的对象是internal版本的对象 obj, gvk, err := decoder.Decode(body, &defaultGVK, original) ... admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo) // 构建调用create方法的函数 requestFunc := func() (runtime.Object, error) { return r.Create( ctx, name, obj, rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), options, ) } // Dedup owner references before updating managed fields dedupOwnerReferencesAndAddWarning(obj, req.Context(), false) result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) { ... // 执行mutation的admission操作,即在创建时对象进行修改操作。 // admin在buildGenericConfig中初始化,通过config传递给genericsever,然后传递到此处 if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) { if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil { return nil, err } } // Dedup owner references again after mutating admission happens dedupOwnerReferencesAndAddWarning(obj, req.Context(), true) // 调用创建方法 result, err := requestFunc() ... return result, err }) ... // resutl也是internal版本的对象,transformResponseObject会转换为用户请求的版本并输出 transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result) }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.

Create请求的流程可以总结为下图

图3 create请求处理流程

总结

本文介绍了 K8s内置资源的注册过程,对APIServer的访问会先经过 filter,再路由给具体的 handler。filter 在 DefaultBuildHandlerChain​ 中定义,主要对请求做超时处理,认证,鉴权等操作。handler 的注册则是初始化 APIGoupInfo​ 并设置其 VersionedResourcesStorageMap​ 后作为入参,调用 GenericAPIServer.InstallAPIGroups​即可完成 handler 的注册。k8s.io/apiserver/pkg/endpoints/handlers​包中的代码则是对用户请求做编解码,对象版本转换,协议处理等操作,最后在交给rest.Storage 具体实现的接口进行处理。

参考

• https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-处理流程[1]

• https://hackerain.me/2020/10/05/kubernetes/kube-apiserver-genericapiserver.html

• https://hackerain.me/2020/09/19/kubernetes/kube-apiserver-storage-overview.html

• https://github.com/gosoon/source-code-reading-notes/blob/master/kubernetes/kube_apiserver.md

• https://time.geekbang.org/column/article/41876

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【重磅消息】OpenAI 开放的GPT3.5-Turbo,价格直接降了90%
下一篇:一文带你看透通用文字识别 OCR
相关文章

 发表评论

暂时没有评论,来抢沙发吧~