点击上方蓝字关注我们
目前 37 网游所有 ToC 的业务都已经从云主机迁移到 k8s 集群,接下来需要将后台性质的业务也迁移到 k8s 集群,保证架构的统一,便于日常管理和维护。但是后台性质的业务往往伴有异步任务(常驻或定时),那么如何将异步任务顺利迁移到 k8s 集群并将它们管理起来是我们面临的首要问题。
在设计方案之前,需要搞清楚我们对于异步任务管理平台的需求到底是什么?
需要基于 k8s 进行任务调度,系统稳定且高可用。理想状态下,任务管理平台挂了,异步任务依旧能正常运行。
实现对异步任务管理(启动,停止,更新,删除)的基础上,还能够进行自定义的配置。
实现对异步任务附带的业务属性进行管理(任务描述、负责人以及操作权限等)。
实现对任务运行状态的观测数据支持以及监控告警。
弄清需求之后,我们选择了集团内部做的比较好的相关产品以及云产品进行方案调研。
我们选择做的比较好的产品做了方案调研,分别是:A云厂商的任务调度产品以及某内部产品任务调度平台,详细情况如下:
A云厂商的任务调度产品 作为云产品,质量肯定有保证。但由于我们的业务主流量在腾讯云上,如果该产品服务挂了,需要 SRE 手动切换导入相应的 yaml 文件,而且它缺失一些业务属性相关的管理功能,不符合我们的需求。
某内部产品任务调度平台生产环境平稳运行很久,质量也是有保证。但由于目前只支持部署在云主机上,我们需要的是基于 k8s 的任务调度平台,因此也不符合我们的需求。
基于以上的方案调研,我们最终采用了自研的方案,对比如下:
通过自研 kjob 任务调度平台 ,可获得收益如下:
为技术中心提供一个统一好用的异步任务管理平台,减轻运维人员工作负担。
将 web 服务和异步任务的架构进行统一,方便管理和维护。
借助 Kubernetes 平台能力,实现对资源的合理调配以及多种任务调度方式的支持。
支持定时任务和异步任务,并提供运行任务的观测能力,及时了解任务的运行情况。
下面将从 kjob 任务调度平台的整体结构和系统架构两方面来介绍实现方案。
从上图可知, kjob 任务调度平台主要分为两个模块:管理后台以及部署在 k8s 的 agent(命名为kjob)。
管理后台:负责任务任务的分发、任务的操作管理以及权限管理等。
kjob:负责将任务信息进行组合转换成 k8s 对应的对象(定时任务对应 cronjob,常驻任务对应 deployment),并提供对 k8s 中任务的运行状态获取以及监控告警。
本系统使用 TCF 框架构建管理后台,操作流转如下:
用户在管理后台进行任务操作。
管理后台通过 HTTP 将任务分发到各个 k8s 集群。
kjob 接受到请求后,将信息进行组合,通过 k8s API 管理对应的资源。
接下来,我将对 kjob 任务调度平台的核心亮点功能进行介绍,帮助大家更快地了解这个系统。
我们对定时任务和常驻任务运行在 k8s 的必填信息进行了筛选,尽可能减少用户填写的内容。最简的信息填写称为简单模式,下图分别是创建定时任务和常驻任务需要的填写项。
k8s 作为调度平台的底层给予了上层系统更多的配置可以自定义(比如资源限制、亲和性、环境变量、生命周期等),这种称为高级模式。
支持多种调度类型、并发任务数以及每个任务的超时时间,基本满足不同业务的各种需求。
其中调度类型支持三种,具体功能如下:
随着业务站点的增加和迭代开发,保存在云上的镜像仓库中的镜像数量也越来越多(地址还长),打通云上镜像仓库支持搜索和选择,可以方便用户进行选择,而不是填写很长且不好记的镜像地址,减轻用户负担。
任务导入设计目的是为了将已经在 k8s 集群运行的 cronjob 和 deployment 交给 kjob 任务调度平台接管。用户只需要将通过 kubectl 导出的对应资源的 json 文件上传,就可以创建对应的任务并优雅接管,使得程序不用中断和重启。
在使用过程中,发现了一个额外用法,可以辅助简化任务的创建。一般情况下 web 服务和异步任务都是同一个站点代码,web 服务使用的是 deployment,部署异步任务使用的是同一份代码,但需要 cronjob 来承载。我们只需要将 web 服务的 json 文件导出,然后通过上述功能导入,最后在编辑页面将类型改成定时任务,就可以减少很多需要配置的选项。比如下图中的配置都可以通过导入功能简化填写:
与传统的任务调度平台不同,基于 k8s 的任务调度平台的设计重点不是实现一个很好的调度模块(k8s 的调度已经足够牛逼了),而是如何更好地与 k8s 交互来实现功能。基于对异步任务的操作需求(创建、更新、删除、获取以及操作日志),定义调度模块的接口功能如下:
// IScheduler 是调度器的接口类型,包括对定时任务和常驻任务的操作定义
type IScheduler interface {
CreateJob(ctx context.Context, entry JobEntry) error
UpdateJob(ctx context.Context, entry JobEntry, all ...bool) error
DeleteJob(ctx context.Context, entry JobEntry) error
GetJobs(ctx context.Context, queries []QueryEntry) (map[string]ReportResult, error)
GetExecutionLog(ctx context.Context, query QueryEntry) ([]ExecutionLog, error)
createCronJob(ctx context.Context, entry JobEntry) error
createDaemonJob(ctx context.Context, entry JobEntry) error
updateCronJob(ctx context.Context, entry JobEntry, all ...bool) error
updateDaemonJob(ctx context.Context, entry JobEntry, all ...bool) error
deleteCronJob(ctx context.Context, entry JobEntry) error
deleteDaemonJob(ctx context.Context, entry JobEntry) error
getCronJobs(ctx context.Context, query QueryEntry) (ReportResult, error)
getDaemonJobs(ctx context.Context, query QueryEntry) (ReportResult, error)
getCronJobExecutionLog(ctx context.Context, query QueryEntry) ([]ExecutionLog, error)
getDaemonJobExecutionLog(ctx context.Context, query QueryEntry) ([]ExecutionLog, error)
}
与 k8s 交互过程中,创建、查询与删除都不复杂,但是需要特别注意更新资源对象的操作,特别是并发更新资源会导致任务异常。因此,需要通过 k8s client-go 中的 RetryOnConflict 函数实现冲突重试,最终保证资源的安全更新。代码如下:
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// 更新逻辑
})
更新资源过程中,一般会根据资源对象的 name 字段先从 k8s 中查到已有的对象,然后更新相应结构体字段,最后调用 Update 函数进行更新。具体如下:
Update(ctx context.Context, cronJob *v1.CronJob, opts metav1.UpdateOptions) (*v1.CronJob, error)
但是当资源对象支持更新的字段很多时,就需要逐个对比哪些字段需要更新并进行赋值,这是一个非常麻烦的事情。
更好的方式就是根据用户提交上来的信息,重新创建一个资源对象,然后调用 Update 函数进行 全量更新。但你会发现更新不生效,这是为什么呢?
因为 k8s 更新资源的时候,需要通过 resourceVersion 字段来做更新请求的版本控制,而重新创建的对象的 resourceVersion 是空值。这时就需要将原本的资源对象查出来,然后将它的 ObjectMeta 结构直接赋值给新创建的对象就可以更新成功了。
k8s 中对应资源对象的状态的定义是比较复杂的,deployment 和 cronjob 也不相同。但是对于用户来说,希望任务的状态是统一且简单的,便于维护和管理。因此,我们对于任务的状态定义了四种状态:待启动、运行中、异常、停止。那关键就是如何将资源对象的状态转换成我们定义的任务状态。
其中,最复杂的就是判断 cronjob 资源对象的状态,因为 cronjob 对象创建 job 对象,job 对象创建 pod 对象,最终的任务执行是在 pod 对象。因此,需要按照下图的逻辑一层一层进行判断才能最终判断出任务的状态。
虽然任务的状态判断逻辑略微有些复杂,但带来的好处就是告警的准确性,一旦出现异常不仅能够告警通知哪个任务异常,还能够将异常的直接原因告警出来,便于负责人立马定位问题,告警信息如下:
具体到哪个 cronjob 下面的哪个 job 对应的哪个 pod 出现了什么问题都是可以感知到的,非常好用。
传统的任务调度平台的实现是 agent 接收调度中心发送的执行命令,然后拉起对应的进程执行任务,这种模式的好处就是可以立马感知到是否启动成功,如果失败了也能获取到对应的原因。
kjob 是使用 deployment 部署在 k8s 集群,上图中有3个副本,这是为保证 kjob 的可用性。
监控的数据来源是数据库,为要保证数据的统一,需要使用分布式锁保证每次只有一副本执行从数据库中批量获取数据操作。
将获取的数据投递到一个 channel 中,然后使用一定数据量(可配置)的 goroutine 从 channel 中获取任务信息,检查资源状态。
上述流程中,主要是用了 Go 并发模式中的 pipeline fan-out 模式,保证监控任务的快速执行的同时,控制 goroutine 的数量,确保资源不会被过度使用。
关键代码如下:
func (s *StatusMonitor) CheckJobs() {
jobEntryChan := s.walkSourceData()
errResultChan := make(chan scheduler.ReportResult)
var wg sync.WaitGroup
wg.Add(s.parallelism)
for i := 0; i < s.parallelism; i++ {
go func() {
s.checkStatus(jobEntryChan, errResultChan)
wg.Done()
}()
}
go func() {
wg.Wait()
close(errResultChan)
}()
for r := range errResultChan {
// 判断是否需要真的告警(避免"pod初始化未完成时与异常的状态是一致的"的问题)
if s.needAlert(r) {
for _, c := range channel.GetAllChannel() {
// 记录告警日志
s.log(r)
// 推送告警消息
c.Send(r)
}
}
}
}
func (s *StatusMonitor) checkStatus(jobEntryChan <-chan scheduler.JobEntry, errResultChan chan<- scheduler.ReportResult) {
for job := range jobEntryChan {
// 停止的 job 不进行监控
jobStatus := scheduler.Status(job.Status)
if jobStatus == scheduler.StatusStop || jobStatus == scheduler.StatusNew {
continue
}
reports, err := s.jobScheduler.GetJobs(context.Background(), []scheduler.QueryEntry{{
JobName: job.Name,
JobType: job.JobType,
Cluster: job.Cluster,
Namespace: job.Namespace,
}})
if err != nil {
s.logger.Errorf(context.Background(), "status monitor check job[%s] status in jobScheduler.GetJobs, err: %v", job.Name, err)
continue
}
result, ok := reports[job.Name]
if !ok {
s.logger.Errorf(context.Background(), "status monitor check job[%s] status in jobScheduler.GetJobs, err: %v", job.Name, errors.New("no job name in result"))
continue
}
// 以集群中的 job 状态为准,修改 db 中的状态
if jobStatus != result.JobStatus {
_, err = dao.SchedulingJobs.Ctx(context.Background()).Data(g.Map{"STATUS": int(result.JobStatus)}).Where(dao.SchedulingJobs.Columns().NAME, job.Name).Update()
if err != nil {
s.logger.Errorf(context.Background(), "status monitor check job[%s] status in update db, err: %v", job.Name, err)
continue
}
}
// 如果是异常的 job 状态,则投递给 notify channel
if result.JobStatus == scheduler.StatusBad {
select {
case errResultChan <- result:
case <-s.done:
return
}
}
}
本文对 kjob 任务调度平台的整体功能与设计进行了介绍。
同时介绍了与 k8s 交互中的注意点、如何准确判断任务状态的方法以及 pipeline fan-out 模式在监控模块的设计。
未来需要提升 kjob 任务调度平台在运行数据观测方向的能力,同时增加 webhook 与用户的发布系统联通,实现发单同时可以同步更新相应的异步任务。
扫码关注 了解更多