作者:蔡锡生,LStack 平台研发工程师,近期专注于基于 OAM 的应用托管平台落地。
背景介绍
KubeSphere 应用商店简介
作为一个开源的、以应用为中心的容器平台,KubeSphere 在 OpenPitrix 的基础上,为用户提供了一个基于 Helm 的应用商店,用于应用生命周期管理。OpenPitrix 是一个开源的 Web 平台,用于打包、部署和管理不同类型的应用。KubeSphere 应用商店让 ISV、开发者和用户能够在一站式服务中只需点击几下就可以上传、测试、部署和发布应用。
KubeSphere 中的 Helm 仓库功能
- KubeSphere Helm 仓库中的应用模版查询
Helm 仓库简介
Helm charts 是存放 K8s 应用模版的仓库,该仓库由 index.yaml
文件和 .tgz
模版包组成。
[root@ningbo stable]# ls -al
总用量 400
drwxr-xr-x. 26 root root 4096 6月 22 17:01 .
drwxr-xr-x. 4 root root 86 6月 22 16:37 ..
-rw-r--r--. 1 root root 10114 6月 22 17:12 index.yaml
-rw-r--r--. 1 root root 3803 6月 8 2020 lsh-cluster-csm-om-agent-0.1.0.tgz
-rw-r--r--. 1 root root 4022 6月 8 2020 lsh-mcp-cc-alert-service-0.1.0.tgz
-rw-r--r--. 1 root root 4340 6月 8 2020 lsh-mcp-cc-sms-service-0.1.0.tgz
-rw-r--r--. 1 root root 4103 6月 8 2020 lsh-mcp-cpm-metrics-exchange-0.1.0.tgz
-rw-r--r--. 1 root root 4263 6月 8 2020 lsh-mcp-cpm-om-service-0.1.0.tgz
-rw-r--r--. 1 root root 4155 6月 8 2020 lsh-mcp-csm-om-service-0.1.0.tgz
-rw-r--r--. 1 root root 3541 6月 8 2020 lsh-mcp-deploy-service-0.1.0.tgz
-rw-r--r--. 1 root root 5549 6月 8 2020 lsh-mcp-iam-apigateway-service-0.1.0.tgz
apiVersion: v1
entries:
aliyun-ccm:
- apiVersion: v2
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for Kubernetes
digest: 6bda563c86333475255e5edfedc200ae282544e2c6e22b519a59b3c7bdef9a32
name: aliyun-ccm
type: application
urls:
- charts/aliyun-ccm-0.1.0.tgz
version: 0.1.0
aliyun-csi-driver:
- apiVersion: v2
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for Kubernetes
digest: b49f128d7a49401d52173e6f58caedd3fabbe8e2827dc00e6a824ee38860fa51
name: aliyun-csi-driver
type: application
urls:
- charts/aliyun-csi-driver-0.1.0.tgz
version: 0.1.0
application-controller:
- apiVersion: v1
appVersion: addon
created: "2021-06-21T08:59:58Z"
description: A Helm chart for application Controller
digest: 546e72ce77f865683ce0ea75f6e0203537a40744f2eb34e36a5bd378f9452bc5
name: application-controller
urls:
- charts/application-controller-0.1.0.tgz
version: 0.1.0
[root@ningbo stable]# cd mysql/
[root@ningbo mysql]# ls -al
总用量 20
drwxr-xr-x. 3 root root 97 5月 25 2020 .
drwxr-xr-x. 26 root root 4096 6月 22 17:01 ..
-rwxr-xr-x. 1 root root 106 5月 25 2020 Chart.yaml
-rwxr-xr-x. 1 root root 364 5月 25 2020 .Helmignore
-rwxr-xr-x. 1 root root 76 5月 25 2020 index.yaml
drwxr-xr-x. 3 root root 146 5月 25 2020 templates
-rwxr-xr-x. 1 root root 1735 5月 25 2020 values.yaml
[root@ningbo mysql]# cat Chart.yaml
apiVersion: v1
appVersion: "1.0"
description: A Helm chart for Kubernetes
name: mysql
version: 0.1.0
添加 Helm 仓库代码介绍
接口实现分析
- 路由注册
- handler,参数解析,调用 models 方面
- models ,调用 models 方法
- crd client,调用 K8s api 存储
webservice.Route(webservice.POST("/repos").
To(handler.CreateRepo).
Doc("Create a global repository, which is used to store package of app").
Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
Param(webservice.QueryParameter("validate", "Validate repository")).
Returns(http.StatusOK, api.StatusOK, openpitrix.CreateRepoResponse{}).
Reads(openpitrix.CreateRepoRequest{}))
func (h *openpitrixHandler) CreateRepo(req *restful.Request, resp *restful.Response) {
createRepoRequest := &openpitrix.CreateRepoRequest{}
err := req.ReadEntity(createRepoRequest)
if err != nil {
klog.V(4).Infoln(err)
api.HandleBadRequest(resp, nil, err)
return
}
createRepoRequest.Workspace = new(string)
*createRepoRequest.Workspace = req.PathParameter("workspace")
user, _ := request.UserFrom(req.Request.Context())
creator := ""
if user != nil {
creator = user.GetName()
}
parsedUrl, err := url.Parse(createRepoRequest.URL)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
userInfo := parsedUrl.User
parsedUrl.User = nil
repo := v1alpha1.HelmRepo{
ObjectMeta: metav1.ObjectMeta{
Name: idutils.GetUuid36(v1alpha1.HelmRepoIdPrefix),
Annotations: map[string]string{
constants.CreatorAnnotationKey: creator,
},
Labels: map[string]string{
constants.WorkspaceLabelKey: *createRepoRequest.Workspace,
},
},
Spec: v1alpha1.HelmRepoSpec{
Name: createRepoRequest.Name,
Url: parsedUrl.String(),
SyncPeriod: 0,
Description: stringutils.ShortenString(createRepoRequest.Description, 512),
},
}
if strings.HasPrefix(createRepoRequest.URL, "https://") || strings.HasPrefix(createRepoRequest.URL, "http://") {
if userInfo != nil {
repo.Spec.Credential.Username = userInfo.Username()
repo.Spec.Credential.Password, _ = userInfo.Password()
}
} else if strings.HasPrefix(createRepoRequest.URL, "s3://") {
cfg := v1alpha1.S3Config{}
err := json.Unmarshal([]byte(createRepoRequest.Credential), &cfg)
if err != nil {
api.HandleBadRequest(resp, nil, err)
return
}
repo.Spec.Credential.S3Config = cfg
}
var result interface{}
result, err = h.openpitrix.ValidateRepo(createRepoRequest.URL, &repo.Spec.Credential)
if err != nil {
klog.Errorf("validate repo failed, err: %s", err)
api.HandleBadRequest(resp, nil, err)
return
}
validate, _ := strconv.ParseBool(req.QueryParameter("validate"))
if !validate {
if repo.GetTrueName() == "" {
api.HandleBadRequest(resp, nil, fmt.Errorf("repo name is empty"))
return
}
result, err = h.openpitrix.CreateRepo(&repo)
}
if err != nil {
klog.Errorln(err)
handleOpenpitrixError(resp, err)
return
}
resp.WriteEntity(result)
}
func (c *repoOperator) CreateRepo(repo *v1alpha1.HelmRepo) (*CreateRepoResponse, error) {
name := repo.GetTrueName()
items, err := c.repoLister.List(labels.SelectorFromSet(map[string]string{constants.WorkspaceLabelKey: repo.GetWorkspace()}))
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("list Helm repo failed: %s", err)
return nil, err
}
for _, exists := range items {
if exists.GetTrueName() == name {
klog.Error(repoItemExists, "name: ", name)
return nil, repoItemExists
}
}
repo.Spec.Description = stringutils.ShortenString(repo.Spec.Description, DescriptionLen)
_, err = c.repoClient.HelmRepos().Create(context.TODO(), repo, metav1.CreateOptions{})
if err != nil {
klog.Errorf("create Helm repo failed, repo_id: %s, error: %s", repo.GetHelmRepoId(), err)
return nil, err
} else {
klog.V(4).Infof("create Helm repo success, repo_id: %s", repo.GetHelmRepoId())
}
return &CreateRepoResponse{repo.GetHelmRepoId()}, nil
}
func (c *HelmRepos) Create(ctx context.Context, HelmRepo *v1alpha1.HelmRepo, opts v1.CreateOptions) (result *v1alpha1.HelmRepo, err error) {
result = &v1alpha1.HelmRepo{}
err = c.client.Post().
Resource("Helmrepos").
VersionedParams(&opts, scheme.ParameterCodec).
Body(HelmRepo).
Do(ctx).
Into(result)
return
}
查询 Helm 仓库应用模版代码介绍
接口实现
- 路由注册
- handler,参数解析,调用 models 方面
- models ,调用 models 方法
- crd client,调用 K8s api 存储
webservice.Route(webservice.GET("/apps").LiHui, 6 months ago: • openpitrix crd
Deprecate().
To(handler.ListApps).
Doc("List app templates").
Param(webservice.QueryParameter(params.ConditionsParam, "query conditions,connect multiple conditions with commas, equal symbol for exact query, wave symbol for fuzzy query e.g. name~a").
Required(false).
DataFormat("key=%s,key~%s")).
Param(webservice.QueryParameter(params.PagingParam, "paging query, e.g. limit=100,page=1").
Required(false).
DataFormat("limit=%d,page=%d").
DefaultValue("limit=10,page=1")).
Param(webservice.QueryParameter(params.ReverseParam, "sort parameters, e.g. reverse=true")).
Param(webservice.QueryParameter(params.OrderByParam, "sort parameters, e.g. orderBy=createTime")).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.OpenpitrixTag}).
Returns(http.StatusOK, api.StatusOK, models.PageableResponse{}))
func (h *openpitrixHandler) ListApps(req *restful.Request, resp *restful.Response)
limit, offset := params.ParsePaging(req)
orderBy := params.GetStringValueWithDefault(req, params.OrderByParam, openpitrix.CreateTime)
reverse := params.GetBoolValueWithDefault(req, params.ReverseParam, false)
conditions, err := params.ParseConditions(req)
if err != nil {
klog.V(4).Infoln(err)
api.HandleBadRequest(resp, nil, err)
return
}
if req.PathParameter("workspace") != "" {
conditions.Match[openpitrix.WorkspaceLabel] = req.PathParameter("workspace")
}
result, err := h.openpitrix.ListApps(conditions, orderBy, reverse, limit, offset)
if err != nil {
klog.Errorln(err)
handleOpenpitrixError(resp, err)
return
}
resp.WriteEntity(result)
}
func (c *applicationOperator) ListApps(conditions *params.Conditions, orderBy string, reverse bool, limit, offset int) (*models.PageableResponse, error) {
apps, err := c.listApps(conditions)
if err != nil {
klog.Error(err)
return nil, err
}
apps = filterApps(apps, conditions)
if reverse {
sort.Sort(sort.Reverse(HelmApplicationList(apps)))
} else {
sort.Sort(HelmApplicationList(apps))
}
totalCount := len(apps)
start, end := (&query.Pagination{Limit: limit, Offset: offset}).GetValidPagination(totalCount)
apps = apps[start:end]
items := make([]interface{}, 0, len(apps))
for i := range apps {
versions, err := c.getAppVersionsByAppId(apps[i].GetHelmApplicationId())
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
ctg, _ := c.ctgLister.Get(apps[i].GetHelmCategoryId())
items = append(items, convertApp(apps[i], versions, ctg, 0))
}
return &models.PageableResponse{Items: items, TotalCount: totalCount}, nil
}
func (c *applicationOperator) listApps(conditions *params.Conditions) (ret []*v1alpha1.HelmApplication, err error) {
repoId := conditions.Match[RepoId]
if repoId != "" && repoId != v1alpha1.AppStoreRepoId {
if ret, exists := c.cachedRepos.ListApplicationsByRepoId(repoId); !exists {
klog.Warningf("load repo failed, repo id: %s", repoId)
return nil, loadRepoInfoFailed
} else {
return ret, nil
}
} else {
if c.backingStoreClient == nil {
return []*v1alpha1.HelmApplication{}, nil
}
ret, err = c.appLister.List(labels.SelectorFromSet(buildLabelSelector(conditions)))
}
return
}
func (c *cachedRepos) ListApplicationsByRepoId(repoId string) (ret []*v1alpha1.HelmApplication, exists bool) {
c.RLock()
defer c.RUnlock()
if repo, exists := c.repos[repoId]; !exists {
return nil, false
} else {
ret = make([]*v1alpha1.HelmApplication, 0, 10)
for _, app := range c.apps {
if app.GetHelmRepoId() == repo.Name {
ret = append(ret, app)
}
}
}
return ret, true
}
既然 app template 是从缓存中获取的,那么缓存中的数据又是什么时候录入的呢?
- 创建全局缓存变量
- 添加新 Helm 仓库,K8s 中已安装 crd 控制器
HelmRepoController
发现有新的 HelmRepo 创建,更新 .Status.Data
内容
- informer 发现有更新,同时更新缓存
缓存更新的实现
- 创建全局变量,通过 init 函数初始化
- 通过 HelmRepo 的 informer 实现缓存同步更新
- 在每次调用接口的时候,hanlder 类中包换了缓存变量
创建接口类 openpitrix.Interface
type openpitrixHandler struct {
openpitrix openpitrix.Interface
}
func newOpenpitrixHandler(ksInformers informers.InformerFactory, ksClient versioned.Interface, option *openpitrixoptions.Options) *openpitrixHandler {
var s3Client s3.Interface
if option != nil && option.S3Options != nil && len(option.S3Options.Endpoint) != 0 {
var err error
s3Client, err = s3.NewS3Client(option.S3Options)
if err != nil {
klog.Errorf("failed to connect to storage, please check storage service status, error: %v", err)
}
}
return &openpitrixHandler{
openpitrix.NewOpenpitrixOperator(ksInformers, ksClient, s3Client),
}
}
NewOpenpitrixOperator
- 通过在 informer 中添加通知函数,执行缓存更新
- once.Do 只执行一次
var cachedReposData reposcache.ReposCache
var HelmReposInformer cache.SharedIndexInformer
var once sync.Once
func init() {
cachedReposData = reposcache.NewReposCache()
}
func NewOpenpitrixOperator(ksInformers ks_informers.InformerFactory, ksClient versioned.Interface, s3Client s3.Interface) Interface {
once.Do(func() {
klog.Infof("start Helm repo informer")
HelmReposInformer = ksInformers.KubeSphereSharedInformerFactory().Application().V1alpha1().HelmRepos().Informer()
HelmReposInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
r := obj.(*v1alpha1.HelmRepo)
cachedReposData.AddRepo(r)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldR := oldObj.(*v1alpha1.HelmRepo)
cachedReposData.DeleteRepo(oldR)
r := newObj.(*v1alpha1.HelmRepo)
cachedReposData.AddRepo(r)
},
DeleteFunc: func(obj interface{}) {
r := obj.(*v1alpha1.HelmRepo)
cachedReposData.DeleteRepo(r)
},
})
go HelmReposInformer.Run(wait.NeverStop)
})
return &openpitrixOperator{
AttachmentInterface: newAttachmentOperator(s3Client),
ApplicationInterface: newApplicationOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient, s3Client),
RepoInterface: newRepoOperator(cachedReposData, ksInformers.KubeSphereSharedInformerFactory(), ksClient),
ReleaseInterface: newReleaseOperator(cachedReposData, ksInformers.KubernetesSharedInformerFactory(), ksInformers.KubeSphereSharedInformerFactory(), ksClient),
CategoryInterface: newCategoryOperator(ksInformers.KubeSphereSharedInformerFactory(), ksClient),
}
}
缓存更新逻辑
type cachedRepos struct {
sync.RWMutex
chartsInRepo map[workspace]map[string]int
repoCtgCounts map[string]map[string]int
repos map[string]*v1alpha1.HelmRepo
apps map[string]*v1alpha1.HelmApplication
versions map[string]*v1alpha1.HelmApplicationVersion
}
- ByteArrayToSavedIndex:将
repo.Status.Data
转换为 SavedIndex 数组对象
- 遍历 SavedIndex.Applications
- 保存(app.ApplicationId:HelmApplication)到 cachedRepos.apps
func (c *cachedRepos) AddRepo(repo *v1alpha1.HelmRepo) error {
return c.addRepo(repo, false)
}
func (c *cachedRepos) addRepo(repo *v1alpha1.HelmRepo, builtin bool) error {
if len(repo.Status.Data) == 0 {
return nil
}
index, err := Helmrepoindex.ByteArrayToSavedIndex([]byte(repo.Status.Data))
if err != nil {
klog.Errorf("json unmarshal repo %s failed, error: %s", repo.Name, err)
return err
}
...
chartsCount := 0
for key, app := range index.Applications {
if builtin {
appName = v1alpha1.HelmApplicationIdPrefix + app.Name
} else {
appName = app.ApplicationId
}
HelmApp := v1alpha1.HelmApplication{
....
}
c.apps[app.ApplicationId] = &HelmApp
var ctg, appVerName string
var chartData []byte
for _, ver := range app.Charts {
chartsCount += 1
if ver.Annotations != nil && ver.Annotations["category"] != "" {
ctg = ver.Annotations["category"]
}
if builtin {
appVerName = base64.StdEncoding.EncodeToString([]byte(ver.Name + ver.Version))
chartData, err = loadBuiltinChartData(ver.Name, ver.Version)
if err != nil {
return err
}
} else {
appVerName = ver.ApplicationVersionId
}
version := &v1alpha1.HelmApplicationVersion{
....
}
c.versions[ver.ApplicationVersionId] = version
}
....
}
return nil
}
HelmRepo 协调器
HelmRepo.Status.Data 加载流程
- LoadRepoIndex: convert index.yaml to IndexFile
- MergeRepoIndex: merge new and old IndexFile
- savedIndex.Bytes(): compress data with zlib.NewWriter
- 将 savedIndex 数据存入 CRD(HelmRepo.Status.Data)
关键结构体
type SavedIndex struct {
APIVersion string `json:"apiVersion"`
Generated time.Time `json:"generated"`
Applications map[string]*Application `json:"apps"`
PublicKeys []string `json:"publicKeys,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
type IndexFile struct {
APIVersion string `json:"apiVersion"`
Generated time.Time `json:"generated"`
Entries map[string]ChartVersions `json:"entries"`
PublicKeys []string `json:"publicKeys,omitempty"`
}
代码位置
func (r *ReconcileHelmRepo) syncRepo(instance *v1alpha1.HelmRepo) error {
index, err := Helmrepoindex.LoadRepoIndex(context.TODO(), instance.Spec.Url, &instance.Spec.Credential)
if err != nil {
klog.Errorf("load index failed, repo: %s, url: %s, err: %s", instance.GetTrueName(), instance.Spec.Url, err)
return err
}
existsSavedIndex := &Helmrepoindex.SavedIndex{}
if len(instance.Status.Data) != 0 {
existsSavedIndex, err = Helmrepoindex.ByteArrayToSavedIndex([]byte(instance.Status.Data))
if err != nil {
klog.Errorf("json unmarshal failed, repo: %s, error: %s", instance.GetTrueName(), err)
return err
}
}
savedIndex := Helmrepoindex.MergeRepoIndex(index, existsSavedIndex)
data, err := savedIndex.Bytes()
if err != nil {
klog.Errorf("json marshal failed, error: %s", err)
return err
}
instance.Status.Data = string(data)
return nil
}
Question:
Q1:Helm 仓库发包时如何进行 Helm release 版本控制
A:修改 Charts.yaml
中的字段 version,然后 Helm package, 等于新增一个 .tgz
包,老版本的不要删除,这时候执行 index 的时候会吧所有的 .tgz
包包含在内。
$ Helm repo index stable --url=xxx.xx.xx.xxx:8081/
$ cat index.yaml
....
redis:
- apiVersion: v1
appVersion: "1.0"
created: "2021-06-22T16:34:58.286583012+08:00"
description: A Helm chart for Kubernetes
digest: fd7c0d962155330527c0a512a74bea33302fca940b810c43ee5f461b1013dbf5
name: redis
urls:
- xxx.xx.xx.xxx:8081/redis-0.1.1.tgz
version: 0.1.1
- apiVersion: v1
appVersion: "1.0"
created: "2021-06-22T16:34:58.286109049+08:00"
description: A Helm chart for Kubernetes
digest: 1a23bd6d5e45f9d323500bbe170011fb23bfccf2c1bd25814827eb8dc643d7f0
name: redis
urls:
- xxx.xx.xx.xxx:8081/redis-0.1.0.tgz
version: 0.1.0
Q2:KubeSphere 版本同步功能有缺失?用户添加完 Helm 仓库后,如果有新的应用发布,查询不到
A:解决方案:使用 3 种同步策略
- 定时同步 Helm 仓库(HelmRepo 设置一个定时协调的事件)
- 企业仓库,用户可以设置 hook,发布新版本的时候主动触发更新
- 用户主动更新 charts
Q3:index.yaml 缓存位置
A:某些仓库的 index.yaml 比较大,如果 1000 个用户,1000 个 charts 会太吃内存。建议常用 index.yaml 的放在内存中,不常用的 index.yaml 存储在本地磁盘。