k8s-apiserver

说明

apiserver对外提供API服务,主要功能是验证处理REST请求,并更新保存在etcd中的相关对象。这篇文章以kubernetes1.5.3进行说明。

代码学习

apiserver的入口是cmd/kube-apiserver/apiserver.go
app.Run()的操作:

  • 参数校验
  • 加载插件
  • m, err := config.Complete().New()为httpserver设置路由
  • sharedInformers.Start(wait.NeverStop)启动informer,用于监听pods, nodes, namespaces等组件的事件
  • m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)启动http server

k8s设置对象的默认值:pkg/api/<version>/defaults.go

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func SetDefaults_Container(obj *Container) {
if obj.ImagePullPolicy == "" {
// Ignore error and assume it has been validated elsewhere
_, tag, _, _ := parsers.ParseImageName(obj.Image)

// Check image tag

if tag == "latest" {
obj.ImagePullPolicy = PullAlways
} else {
obj.ImagePullPolicy = PullIfNotPresent
}
}
if obj.TerminationMessagePath == "" {
obj.TerminationMessagePath = TerminationMessagePathDefault
}
}

验证对象格式:pkg/api/validation/validation.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ValidateHasLabel requires that api.ObjectMeta has a Label with key and expectedValue
func ValidateHasLabel(meta api.ObjectMeta, fldPath *field.Path, key, expectedValue string) field.ErrorList {
allErrs := field.ErrorList{}
actualValue, found := meta.Labels[key]
if !found {
allErrs = append(allErrs, field.Required(fldPath.Child("labels").Key(key),
fmt.Sprintf("must be '%s'", expectedValue)))
return allErrs
}
if actualValue != expectedValue {
allErrs = append(allErrs, field.Invalid(fldPath.Child("labels").Key(key), meta.Labels,
fmt.Sprintf("must be '%s'", expectedValue)))
}
return allErrs
}

api url结构

prefix/group/version/…
例:
/apis/extensions/v1beta1/deployments,prefix=apis group=extensions version=v1beta1
/api/v1,prefix=api group="" version=v1

操作etcd

代码位置:pkg/storage,使用etcd客户端github.com/coreos/etcd/clientv3(v3)/github.com/coreos/etcd/client(v2)

pkg/api/rest/rest.go

声明各个interface,在registerResourceHandlers中做断言

registerResourceHandlers()为path注册handler

action path:namespaces/{namespace}/bindings, verb:POST
action path:componentstatuses, verb:LIST
action path:componentstatuses/{name}, verb:GET

type APIGroupVersion

1
2
3
type APIGroupVersion struct {
Storage map[string]rest.Storage
...

Storage的key(string)是各资源的path,value(rest.Storage)是操作此path的对象
下面的函数从apiGroupInfo.VersionedResourcesStorageMap中取得storage

1
2
3
4
5
6
7
8
9
10
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion unversioned.GroupVersion, apiPrefix string) (*apiserver.APIGroupVersion, error) {
storage := make(map[string]rest.Storage)
for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
storage[strings.ToLower(k)] = v
}
version, err := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
version.Root = apiPrefix
version.Storage = storage
return version, err
}

type APIGroupInfo
APIGroupInfo.VersionedResourcesStorageMap保存group中各version的资源storage

1
2
3
4
5
6
// Info about an API group.
type APIGroupInfo struct {
GroupMeta apimachinery.GroupMeta
// Info about the resources in this group. Its a map from version to resource to the storage.
VersionedResourcesStorageMap map[string]map[string]rest.Storage
...

pkg/registry

代码中对这个包的说明是Package registry implements the storage and system logic for the core of the api server.
pkg/registry/core/rest/storage_core.go为例说明
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)此函数中初始化资源与storage映射的map,部分代码:

1
2
3
4
5
6
7
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
...

pkg/registry/core/pod/etcd/etcd.goNewStorage()中变量storageInterface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c completedConfig) New() (*Master, error) {
...
restOptionsFactory := restOptionsFactory{
deleteCollectionWorkers: c.DeleteCollectionWorkers,
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
storageFactory: c.StorageFactory,
}

if c.EnableWatchCache {
restOptionsFactory.storageDecorator = registry.StorageWithCacher
} else {
restOptionsFactory.storageDecorator = generic.UndecoratedStorage
}
...
1
2
3
4
5
6
7
8
9
10
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
if err != nil {
glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
}
return s, d
}

storage.Interface

pkg/storage/etcd/etcd_helper.go使用etcd v2 api,实现了storage.Interface
pkg/storage/etcd3/store.go使用etcd v3 api,实现了storage.Interface

编解码etcd存储的数据

pkg/genericapiserver/storage_factory.go NewStorageCodec()生成Codec

参考