//定义一个作业类,实现用户的业务逻辑
public class HelloJob implements Job {
......
实现业务逻辑
}
//根据作业类得到JobDetail
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
//定义一个触发器,按照规定的时间调度作业
Trigger trigger = TriggerBuilder.newTrigger("每隔1分钟执行一次")
//根据作业类和触发器创建调度器
Scheduler scheduler = scheduler.scheduleJob(jobDetail,trigger);
//启动调度器,开始执行任务
scheduler .start()
public class MyElasticJob implements SimpleJob {
public void execute(ShardingContext context) {
//实现业务逻辑
......
}
// 对zookeeper进行设置,作为分布式任务的注册中心
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("xxxx"));
regCenter.init();
return regCenter;
}
//设置任务的执行频率、执行的类
private static LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
//主函数
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
}
public class JobScheduler {
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
//作业配置
private final LiteJobConfiguration liteJobConfig;
//注册中心
private final CoordinatorRegistryCenter regCenter;
//调度器门面
private final SchedulerFacade schedulerFacade;
//作业门面
private final JobFacade jobFacade;
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
/**
* 初始化作业.
*/
public void init() {
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(liteJobConfig);
jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
}
/**
* 注册作业启动信息.
*
* @param liteJobConfig 作业配置
*/
public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
regCenter.addCacheData("/" + liteJobConfig.getJobName());
// 开启所有监听器
listenerManager.startAllListeners();
// 选举主节点
leaderService.electLeader();
//持久化job的配置信息
configService.persist(liteJobConfig);
LiteJobConfiguration liteJobConfigFromZk = configService.load(false);
// 持久化作业服务器上线信息
serverService.persistOnline(!liteJobConfigFromZk.isDisabled());
// 持久化作业运行实例上线相关信息,将服务实例注册到zk
instanceService.persistOnline();
// 设置 需要重新分片的标记
shardingService.setReshardingFlag();
// 初始化 作业监听服务
monitorService.listen();
// 初始化 调解作业不一致状态服务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
/**
* 调度作业.
*
* @param cron CRON表达式
*/
public void scheduleJob(final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(cron));
}
scheduler.start();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
//忽略其它代码
}
public final class LiteJob implements Job {
private ElasticJob elasticJob;
private JobFacade jobFacade;
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
public final class JobExecutorFactory {
/**
* 获取作业执行器.
*
* @param elasticJob 分布式弹性作业
* @param jobFacade 作业内部服务门面服务
* @return 作业执行器
*/
"unchecked") (
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
// ScriptJob
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
// SimpleJob
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
// DataflowJob
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
}
// AbstractElasticJobExecutor.java
public final void execute() {
// 检查作业执行环境
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 获取当前作业服务器的分片上下文
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// 发布作业状态追踪事件(State.TASK_STAGING)
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 跳过存在运行中的被错过作业
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
// 发布作业状态追踪事件(State.TASK_FINISHED)
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
// 执行作业执行前的方法
try {
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
// 执行普通触发的作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
// 执行被跳过触发的作业
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 执行作业失效转移
jobFacade.failoverIfNecessary();
// 执行作业执行后的方法
try {
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
作者简介
Xinchun OPPO高级后端工程师
目前负责分布式作业调度的研发,关注消息队列、redis数据库、ElasticSearch等中间件技术。