点击关注上方蓝字,阅读更多干货~
2 常见的传统定时任务
示例代码:
class MyTimer1 {
public static void main(String[] args){
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("Timer方式调度任务,当前时间:"+ new Date());
}
};
Timer timer = new Timer();
timer.schedule(timerTask,10,500);
}
}
执行结果:
class MyTimer2 {
public static boolean flag = false;
public static void main(String[] args) throws InterruptedException {
TimerTask timerTask = new TimerTask() {
public void run() {
System.out.println("Timer方式调度任务1,当前时间:"+ new Date());
}
};
TimerTask timerTask2 = new TimerTask() {
public void run() {
if(flag) {
throw new RuntimeException("抛出异常");
}
System.out.println("Timer方式调度任务2,当前时间:" + new Date());
}
};
Timer timer = new Timer();
timer.schedule(timerTask,10,500);
timer.schedule(timerTask2,10,500);
Thread.sleep(1000);
flag = true;
Thread.sleep(1000);
flag = false;
}
}
执行结果:
class MyTimer3 {
public static void main(String[] args) throws InterruptedException {
TimerTask timerTask = new TimerTask() {
public void run() {
System.out.println("Timer方式调度任务1,当前时间:"+ new Date());
}
};
TimerTask timerTask2 = new TimerTask() {
public void run() {
System.out.println("Timer方式调度任务2,当前时间:" + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Timer timer = new Timer();
timer.schedule(timerTask,10,500);
Thread.sleep(2000);
timer.schedule(timerTask2,10,500);
}
}
执行结果:
class MyTimer4{
public static boolean flag = false;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
Runnable task = () -> {
long step = 1000L;
while(!flag) {
try {
long sleepTime = step - System.currentTimeMillis() % step;
Thread.sleep(sleepTime);
System.out.println("多线程休眠式调度任务,当前时间:" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.execute(task);
Thread.sleep(5000);
flag = true;
executor.shutdown();
}
}
执行结果:
class MyTimer5{
public static boolean flag = false;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
Runnable task = () -> {
int times = 5;
long step = 1000L;
while(!flag) {
int currIndex = times--;
if(currIndex == 1) {
System.out.println("多线程休眠式调度任务,抛出异常,当前时间:" + new Date());
throw new RuntimeException("抛出异常");
}
try {
long sleepTime = step - System.currentTimeMillis() % step;
Thread.sleep(sleepTime);
System.out.println("多线程休眠式调度任务,当前时间:" + new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.execute(task);
Thread.sleep(5000);
flag = true;
executor.shutdown();
}
}
执行结果:
class MyTimer6{
public static boolean flag = false;
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
Runnable task = () -> {
long nextTime = 0;
long delayTime = 1000L;
while(!flag) {
long currTime = System.currentTimeMillis();
if(nextTime <= currTime){
nextTime = currTime + delayTime;
System.out.println("多线程轮询式调度任务,当前时间:" + new Date());
}
}
};
executor.execute(task);
Thread.sleep(5000);
flag = true;
executor.shutdown();
}
}
执行结果:
ScheduledExecutorService 是一个 ScheduledThreadPoolExecutor 线程池,和普通线程池不同的是调度时间不是由线程内的逻辑处理,而是由 DelayedWorkQueue 控制延时。
代码示例:
class MyTimer7{
public static void main(String[] args) {
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10);
Runnable task = () -> System.out.println("ScheduledExecutorService方式调度任务,当前时间:"+ new Date());
service.scheduleAtFixedRate(task,10L, 1000L,TimeUnit.MILLISECONDS);
}
}
执行结果:
代码示例:
class MyTimer8{
public static void main(String[] args) {
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10);
Runnable task = new Runnable() {
int times = 5;
@Override
public void run() {
int currIndex = times--;
if(currIndex == 1) {
System.out.println("ScheduledExecutorService方式调度异常任务,抛出异常,当前时间:" + new Date());
throw new RuntimeException("抛出异常");
}
System.out.println("ScheduledExecutorService方式调度异常任务,当前时间:" + new Date());
}
};
service.scheduleAtFixedRate(task,10L, 1000L,TimeUnit.MILLISECONDS);
}
}
执行结果:
Timer+TimerTask 方式可以作为处理定时任务的一个实现方式,但是对于一个 Timer 管理多个定时任务的场景,因为所有的任务是串行执行,所以如果其中的一个任务出现异常,会导致所有任务都终止,且当其中一个任务占用的时间过长时,会导致后续任务时间出现延迟,因此要使用 Timer 方式实现需要处理这几种情况。
Elastic-job | xxl-job | Quartz | |
说明 | 分布式任务调度框架,需要单独部署调度中心客户端,可视化界面可选部署 | 分布式任务调度框架,需要单独部署调度中心客户端,集成可视化界面 | 说明:基础的单体定时任务框架,是Java实现上的定时任务标准。 对任务的控制需要直接操作数据库或者调用api操作; 对任务的持久化需要引入数据库,在不考虑单服务多数据源的情况下需要写入业务库; 对任务的调度逻辑需要集成到业务系统中,数量增多会占用系统资源,影响业务性能以及自身调度精度。 |
强依赖组件 | Zookeeper | MySQL | |
业务侵入性 | 较低 | 较低 | |
高可用 | 由Zookeeper和实例共同控制 | 由数据库锁保证高可用 | |
并行调度 | 通过分片任务控制并发调度 | 系统多线程控制并发调度 | |
分片策略 | 支持多种分片策略,可自定义分片策略 | 以执行器为维度进行分片 | |
失败处理策略 | 选取空闲分片再次执行失败任务 | 策略包括:失败告警(默认)、失败重试 | |
弹性扩容 | 通过zk实现各服务的注册、控制及协调 | 通过数据库实现动态扩容,执行器超出一定数量则会影响性能 |
因为业务需求,需要定时清洗数据,整体由1次全量任务和持续的增量任务组成,因此需要使用定时任务来实现增量任务的调度,不存在较高的并发场景,但是单次任务的处理量可能较大。
就功能性来说,Xxl-job 和 Elastic-job 相差不多,且都有较大的用户基础和技术文档,而 xxl-job 只依赖数据库作为集群注册中心,不需要额外引入 Zookeeper 组件,且 xxl-job 更加侧重于业务实现的简单和管理的方便,学习成本更低,且集成的可视化界面使用起来比较友好,结合系统的并发场景来看,xxl-job是一个更好的选择。
一般情况下,定时任务的主要包括3个组件,计时器、调度器以及执行器,计时器负责计算时间,并在目标时间通知调度器,调度器负责遍历任务,并通知目标执行器执行任务,其中计时器和调度器影响任务调度时间,执行器影响任务结果。
作为定时任务最主要的组件之一,计时器的性能直接影响到定时任务的调度精度,常见的计时器有以下两种设计,时间轴和时间轮。
时间轴计时器是最基础的计时器算法,每隔一段时间比较当前时间和目标时间,优点是实现方式简单,缺点是因为长度不固定,不方便将定时任务分组。
时间轮就是将时间轴维护成了类似钟表的结构,低级计时器每走完一轮则使高级计时器往前一格,而各个时间的跨度可以各自定义。为了方便设置,也会定义成时间单位的跨度,如60秒为1分钟,24小时为1天,示意图如下。时间轮的好处是可以减少同时运行的计时器数量,更方便将任务分组,减少调度器需要遍历的任务数量。
图9 时间轮结构示意图
调度中心作为分布式定时任务的管理者,经常会出现一个时间点内处理很多任务的场景,如果由计时器来负责任务的调度会导致计时出现偏差,比如调度10:00:00的所有任务需要10s,那么如果存在需要在10:00:05执行的任务则最早会在10:00:10开始调度,会导致后续任务出现偏差。
因此,为了保证计时器时间的准确性,需要将调度功能从计时器的功能中拆解出来,由调度器来进行某个时间点的任务调度,让计时器只需要专注于计时,降低后续调度任务的误差。
代码示例:
public class JobScheduleHelper {
private class ScheduleThread extends Thread {
public void run() {
try {
//线程在4~5秒后执行任务
TimeUnit.MILLISECONDS.sleep(PRE_READ_MS - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while(!scheduleThreadToStop){
//计算任务执行时间
long start = System.currentTimeMillis();
boolean preReadSuc = true;//读取到数据则为true
List<XxlJobInfo> scheduleList = getList();//省略获取待执行任务的流程
long nowTime = System.currentTimeMillis();
if (!CollectionUtils.isEmpty(scheduleList)) {//不为空则处理任务
resolveSchedule(nowTime, scheduleList);
} else {
preReadSuc = false;
}
long cost = System.currentTimeMillis() - start;
//执行时间小于1s则需要等待
//(控制任务表的查询的间隔大于1秒,如果有任务执行则等待5秒 => 降低数据库压力)
if (cost < 1000) {
try {
//有任务处理时等待4~5秒,否则等待1秒
long delayTime = preReadSuc ? 1000 : PRE_READ_MS;
TimeUnit.MILLISECONDS.sleep(delayTime - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
//异常处理
}
}
}
}
}
//计时器线程
private class RingThread extends Thread{
public void run() {
while (!ringThreadToStop) {
List<Integer> ringItemData = new ArrayList<>();
//回溯前2s待执行的任务并写入ringItemData
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond + 60 - i) % 60 );
if (!CollectionUtils.isEmpty(tmpData)) {
ringItemData.addAll(tmpData);
}
}
//遍历触发任务
for (int jobId: ringItemData) {
//触发任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
ringItemData.clear();
}
}
}
}
上文已经说完了 xxl-job 的计时器精度保证机制,在这里我们来讲讲调度器的精度保证机制。从调度器的功能上我们可以知道,影响调度器精度的两个方面是待遍历任务数和遍历效率,其中遍历效率因为涉及到各类遍历算法,这里不做赘述。
传统定时中的调度部分是通过遍历待执行列表来控制任务执行的,实现较为简单,但是这种方式可能因为调度器需要遍历的任务数量较多,导致任务的执行时机出现误差,而 xxl-job 中则通过将任务分组,提高了这种场景出现的数量条件。
public void resolveSchedule(long nowTime, List<XxlJobInfo> scheduleList) throws Exception {
int ringSecond = (int) ((jobInfo.getTriggerNextTime() / 1000) % 60);
//将任务存入计时器
pushTimeRing(ringSecond, jobInfo.getId());
}
private void pushTimeRing(int ringSecond, int jobId){
//将任务根据相对时间分组
if(!ringData.containsKey(ringSecond)) {
ringData.put(ringSecond,new ArrayList<>());
}
List<Integer> ringItemData = ringData.get(ringSecond);
ringItemData.add(jobId);
}
public void run(){
while (!ringThreadToStop) {
//处理任务回溯时间为2秒(降低单次任务执行时间过长导致异常的概率)
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
//将两秒内的待处理任务都存入到ringItemData中
for (int i = 0; i < 2; i++) {
//如当前为11:00:01->(1+60-0)%60 = 01
//如当前为11:00:01->(1+60-1)%60 = 00
List<Integer> tmpData = ringData.remove((nowSecond + 60 - i) % 60);
if (!CollectionUtils.isEmpty(tmpData)) {
ringItemData.addAll(tmpData);
}
//遍历触发任务
for (int jobId: ringItemData) {
//触发普通任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
ringItemData.clear();
}
}
}
调度中心既然只是作为一个定时任务的管理平台,那自然不会去维护对应定时任务的逻辑,各个任务的执行逻辑都由各自服务自己负责,而调度中心怎么知道对应的执行器呢?
xxl-job 中一共提供了两种执行器的注册方式,手动录入和自动注册,因为手动录入的逻辑较为简单,这里我们主要针对自动注册逻辑来进行讲解。
执行器启动时会读取配置,通过遍历配置中的调度中心地址列表来向任务调度中心注册该执行器。
public class XxlJobExecutor {
private String adminAddresses;
private String accessToken;
public void start() throws Exception {
initAdminBizList(adminAddresses, accessToken);
initEmbedServer(address, ip, port, appname, accessToken);
}
//初始化服务管理
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken){
String[] addressArray = adminAddresses.trim().split(",");
for (String address: addressArray) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
adminBizList.add(adminBiz);
}
}
//初始化内嵌服务
private void initEmbedServer(String address, String ip, int port, String appName, String accessToken){
embedServer = new EmbedServer();
embedServer.start(address, port, appName, accessToken);
}
}
启动内嵌服务器:
public class EmbedServer {
private ExecutorBiz executorBiz;
public void start(String address, int port, String appName, String accessToken) {
startRegistry(appName, address);
}
public void startRegistry(String appName, String address) {
//启动注册线程
ExecutorRegistryThread.getInstance().start(appName, address);
}
}
public class ExecutorRegistryThread {
public void start(String appName, String address) {
RegistryParam registryParam = new RegistryParam(RegistryType.EXECUTOR.name(), appName, address);
List<AdminBiz> adminBizList = XxlJobExecutor.getAdminBizList();
for (AdminBiz adminBiz : adminBizList) {
//注册当前执行器
ReturnT<String> registryResult = adminBiz.registry(registryParam);
}
}
public class AdminBizClient implements AdminBiz {
public ReturnT<String> registry(RegistryParam registryParam) {
//访问调度中心提供的接口
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
}
任务调度由调度中心和执行器共同处理。客户端的代码如下:
public class XxlJobTrigger {
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
ReturnT<String> triggerResult = runExecutor(triggerParam, address);
}
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//发送post请求到执行器
ReturnT<String> runResult = executorBiz.run(triggerParam);;
}
}
public class ExecutorBizClient implements ExecutorBiz {
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
}
在这里我们可以看出,调度中心经过一系列处理,最终访问执行器提供的 run 接口来通知执行器。执行器的代码如下:
public class EmbedServer {
private ExecutorBiz executorBiz = new ExecutorBizImpl();
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
switch (uri) {
case "/run": {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
}
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
}
}
}
public class ExecutorBizImpl implements ExecutorBiz {
public ReturnT<String> run(TriggerParam triggerParam) {
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
//从执行器注册的Handler列表中读取目标的Handler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
}else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
//调用方式2
}else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
//调用方式3
}else{
//打印异常日志
}
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
}
}
上面已经说完定时任务的正常调度逻辑,这里说说定时任务失败时的处理逻辑。源码如下:
public class JobFailMonitorHelper {
public void start(){
monitorThread = new Thread(() -> {
while (!toStop) {
XxlJobAdminConfig adminConfig = XxlJobAdminConfig.getAdminConfig();
//查询出现异常的日志Id
List<Long> failLogIds = logDao.findFailJobLogIds(1000);
for (long failLogId: failLogIds) {
int lockRet = logDao.updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = logDao.load(failLogId);
// 1、异常重试,通过日志判断设置的重试次数,设置了失败重试次数的任务会自动重试
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
}
XxlJobInfo info = adminConfig.getXxlJobInfoDao().loadById(log.getJobId());
// 2、判断当前任务是否需要告警
// 告警状态:0=默认、-1=锁定状态、1=无需告警、2=告警成功、3=告警失败
int newAlarmStatus;
if (info != null) {
//处理告警
boolean alarmResult = adminConfig.getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult ? 2 : 3;
} else {
newAlarmStatus = 1;
}
adminConfig.getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
//每10s处理一次任务重试逻辑
TimeUnit.SECONDS.sleep(10);
}
}
}
}
public class JobAlarmer implements ApplicationContextAware, InitializingBean {
private List<JobAlarm> jobAlarmList;
public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {
if (Collection.isEmpty(jobAlarmList)) {
return false;
}
//只要有1个任务失败则返回false
boolean result = true;
for (JobAlarm alarm: jobAlarmList) {
try {
if(!alarm.doAlarm(info, jobLog)) {
result = false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
result = false;
}
}
return result;
}
public class EmailJobAlarm implements JobAlarm {
public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){
boolean alarmResult = true;
if(ObjectUtils.isEmpty(info)){
return true;
}
//如果设置了告警邮箱地址,则在发生异常时发送告警邮件
if (StringUtils.isEmpty(info.getAlarmEmail())) {
return true;
}
// 拼接邮件内容
String alarmContent = "";
//处理邮箱地址,用","分隔
List<String> emailList = Arrays.asList(info.getAlarmEmail().split(","));
Set<String> emailSet = new HashSet<String>(emailList);
for (String email: emailSet) {
JavaMailSender mailSender = XxlJobAdminConfig.getAdminConfig().getMailSender();
try {
MimeMessage mimeMessage = mailSender.createMimeMessage();
//省略设置邮件参数
//调用Spring接口发送邮件
mailSender.send(mimeMessage);
}catch (Exception e) {
alarmResult = false;
}
}
return alarmResult;
}
}
通过本篇文章的分析,对于
本文作者
叶刀刀,来自缦图互联网中心后端团队。
--------END--------
也许你还想看