李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
07.SpringBoot集成Quartz实现分布式任务调度
Leefs
2021-09-03 PM
2080℃
0条
# 07.SpringBoot集成Quartz实现分布式任务调度 ### 前言 **本篇内容包括** + SpringBoot整合Quartz + Quartz持久化 + 分布式任务调度 ### 一、介绍 #### 1.1 Quartz集群 Quartz集群中每个节点都是一个单独的Quartz应用,它又管理着其他的节点。这个集群需要每个节点单独的启动或停止;和我们的应用服务器集群不同,独立的Quratz节点之间是不需要通信的。不同节点之间是通过数据库表来感知另一个应用。只有使用持久的JobStore才能完成Quartz集群。 ![07.SpringBoot集成Quartz实现分布式任务调度01.png](https://lilinchao.com/usr/uploads/2021/09/2659836787.png) #### 1.2 Quartz持久化 Quartz持久化配置提供了两种存储器: | 类型 | 优点 | 缺点 | | ------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | RAMJobStore | 不要外部数据库,配置容易,运行速度快 | 因为调度程序信息是存储在被分配给 JVM 的内存里面,所以,当应用程序停止运行时,所有调度信息将被丢失。另外因为存储到JVM内存里面,所以可以存储多少个 Job 和 Trigger 将会受到限制 | | JDBC 作业存储 | 支持集群,因为所有的任务信息都会保存到数据库中,可以控制事物,还有就是如果应用服务器关闭或者重启,任务信息都不会丢失,并且可以恢复因服务器关闭或者重启而导致执行失败的任务 | 运行速度的快慢取决与连接数据库的快慢 | ### 二、操作步骤 需要提前创建一个SpringBoot项目 #### 2.1 引入依赖包 ```xml
org.springframework.boot
spring-boot-starter-web
commons-lang
commons-lang
2.5
mysql
mysql-connector-java
5.1.39
com.mchange
c3p0
0.9.5.4
``` #### 2.2 配置文件 通过在application.yml配置文件中对quartz进行相关配置。 ```yaml server: port: 8090 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=utf-8&useSSL=false username: root password: 123456 quartz: jdbc: initialize-schema: never # 是否自动使用 SQL 初始化 Quartz 表结构。always:总是,never:不需要 job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 properties: org: quartz: scheduler: instanceName: QuartzScheduler # 调度标识名 集群中每一个实例都必须使用相同的名称 instanceId: AUTO # 定时任务的实例编号, 如果手动指定需要保证每个节点的唯一性 threadPool: class: org.quartz.simpl.SimpleThreadPool threadCount: 100 # 线程池大小。默认为 10 。 threadPriority: 5 # 线程优先级 jobStore: misfireThreshold: 120000 ``` *说明:application.yml文件中的配置相当与Quartz中quartz.properties配置文件* #### 2.3 创建数据库表 ##### Quartz持久化过程创建数据库表方式 **第一种** 将yml文件中配置项initialize-schema在第一次执行时指定为always,会自动在数据库中生成表信息,当表创建完成后在将参数改成never ``` initialize-schema: always ``` **第二种** 在官网中下载Quartz对应版本的安装包在`docs\dbTables`目录下找到`tables_mysql_innodb.sql`文件在数据库中执行SQL。 官网下载地址:http://www.quartz-scheduler.org/downloads/ ##### 数据库表结构说明 | 表名 | 说明 | | ------------------------- | ----------------------------------------------------- | | qrtz_blob_triggers | 以Blob 类型存储的触发器 | | qrtz_calendars | 存放日历信息, quartz可配置一个日历来指定一个时间范围 | | qrtz_cron_triggers | 存放cron类型的触发器 | | qrtz_fired_triggers | 存放已触发的触发器 | | qrtz_job_details | 存放一个jobDetail信息 | | qrtz_job_listeners | job监听器 | | qrtz_locks | 存储程序的悲观锁的信息(假如使用了悲观锁) | | qrtz_paused_trigger_graps | 存放暂停掉的触发器 | | qrtz_scheduler_state | 调度器状态 | | qrtz_simple_triggers | 简单触发器的信息 | | qrtz_trigger_listeners | 触发器监听器 | | qrtz_triggers | 触发器的基本信息 | #### 2.4 定义Job + QuartzJobFirst ```java import com.dataojo.quartz.util.SchedulerUtils; import org.quartz.*; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QuartzJobFirst extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext){ SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List
executionContexts; try { executionContexts = SchedulerUtils.getScheduler().getCurrentlyExecutingJobs(); } catch (SchedulerException e) { e.printStackTrace(); return; } for (JobExecutionContext executionContext : executionContexts){ JobKey jobKey = executionContext.getJobDetail().getKey(); Date fireTime = executionContext.getFireTime(); System.out.println(jobKey+",对应的执行时间是"+sf.format(fireTime)); } } } ``` + QuartzJobTwo ```java import org.quartz.*; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.SimpleDateFormat; import java.util.Date; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QuartzJobTwo extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext){ SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("QuartzJobTwo执行时间是"+sf.format(new Date())); } } ``` **说明** + **QuartzJobBean** QuartzJobBean已经实现了job接口,并重写了接口中的execute()方法,在SpringBoot集成时候直接继承QuartzJobBean即可。执行逻辑在executeInternal()方法中。 + **@PersistJobDataAfterExecution** 告诉Quartz在成功执行了Job实现类的execute方法后(没有发生任何异常),更新JobDetail中JobDataMap的数据,使得该JobDetail实例在下一次执行的时候,JobDataMap中是更新后的数据,而不是更新前的旧数据。 + **@DisallowConcurrentExecution** 告诉Quartz不要并发地执行同一个JobDetail实例。 **总结** + 当某一个JobDetail实例到点运行之后,在其运行结束之前,不会再发起一次该JobDetail实例的调用,即使设置的该JobDetail实例的定时执行时间到了。 + JobDetail实例之间互不影响。 #### 2.5 实现ApplicationListener 完成动态调度 根据ApplicationListener的原理,其onApplicationEvent(ContextRefreshedEvent event) 方法会在初始化所有的bean之后被调用,因此我们可以在这里进行scheduler的创建、启动,以及注册trigger和job。 ```java import com.dataojo.quartz.job.QuartzJobFirst; import com.dataojo.quartz.job.QuartzJobTwo; import com.dataojo.quartz.util.SchedulerUtils; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @Component public class StartApplicationListener implements ApplicationListener
{ @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { System.out.println("-------执行StartApplicationListener--------"); SchedulerUtils.scheduleCronJob(QuartzJobFirst.class, "*/5 * * * * ?"); SchedulerUtils.scheduleCronJob(QuartzJobTwo.class, "*/10 * * * * ?"); } } ``` #### 2.6 工具类 + **SchedulerUtils** ```java import org.apache.commons.lang.time.DateUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.ParseException; import java.util.Date; /** * @author lilinchao * @date 2021/9/3 * @description 1.0 */ public class SchedulerUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class); private static Scheduler scheduler; private static String jobGroup = "group"; static { if (SpringContextUtils.getApplicationContext() != null) { scheduler = SpringContextUtils.getBean(Scheduler.class); } else { SchedulerFactory schedulerFactory = new StdSchedulerFactory(); try { scheduler = schedulerFactory.getScheduler(); scheduler.start(); } catch (SchedulerException e) { throw new RuntimeException(e); } } } private SchedulerUtils() { } public static Scheduler getScheduler() { return scheduler; } public static void scheduleCronJob(Class extends Job> jobClass, String cronExpression) { scheduleCronJob(jobClass, jobClass.getSimpleName(), cronExpression); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String cronExpression) { scheduleCronJob(jobClass, name, jobGroup, cronExpression); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression) { scheduleCronJob(jobClass, name, group, cronExpression, null); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression, JobDataMap jobDataMap) { scheduleCronJob(jobClass, name, group, cronExpression, jobDataMap, null, null); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression, JobDataMap jobDataMap, Date startDate, Date endDate) { try { JobKey jobKey = new JobKey(String.valueOf(name), group); if (!scheduler.checkExists(jobKey)) { JobBuilder jobBuilder = JobBuilder.newJob(jobClass); jobBuilder.withIdentity(jobKey); if (jobDataMap != null && !jobDataMap.isEmpty()) { jobBuilder.setJobData(jobDataMap); } JobDetail jobDetail = jobBuilder.build(); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); TriggerBuilder
triggerBuilder = TriggerBuilder.newTrigger().withSchedule(cronScheduleBuilder); if (startDate != null) { triggerBuilder.startAt(startDate); } else { triggerBuilder.startNow(); } if (endDate != null) { triggerBuilder.endAt(endDate); } CronTrigger trigger = triggerBuilder.build(); scheduler.scheduleJob(jobDetail, trigger); } } catch (Exception e) { LOGGER.error("Submit job error, name=" + name + " and group=" + group, e); } } /** * 默认立即执行且只执行一次 * @param jobClass */ public static void scheduleSimpleJob(Class extends Job> jobClass) { scheduleSimpleJob(jobClass, jobClass.getSimpleName()); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name) { scheduleSimpleJob(jobClass, name, 0, 0); } /** * @param jobClass * @param name * @param intervalInMilliseconds 执行间隔 * @param repeatCount 重复次数,小于0的时候重复执行 */ public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, long intervalInMilliseconds, int repeatCount) { scheduleSimpleJob(jobClass, name, jobGroup, intervalInMilliseconds, repeatCount); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount) { scheduleSimpleJob(jobClass, name, group, intervalInMilliseconds, repeatCount, null); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount, JobDataMap jobDataMap) { scheduleSimpleJob(jobClass, name, group, intervalInMilliseconds, repeatCount, jobDataMap, null, null); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount, JobDataMap jobDataMap, Date startDate, Date endDate) { try { JobKey jobKey = new JobKey(String.valueOf(name), group); if (!scheduler.checkExists(jobKey)) { JobBuilder jobBuilder = JobBuilder.newJob(jobClass); jobBuilder.withIdentity(jobKey); if (jobDataMap != null && !jobDataMap.isEmpty()) { jobBuilder.setJobData(jobDataMap); } JobDetail jobDetail = jobBuilder.build(); SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule(); simpleScheduleBuilder.withIntervalInMilliseconds(intervalInMilliseconds); if (repeatCount >= 0) { simpleScheduleBuilder.withRepeatCount(repeatCount); } else { simpleScheduleBuilder.repeatForever(); } TriggerBuilder
triggerBuilder = TriggerBuilder.newTrigger().withSchedule(simpleScheduleBuilder); if (startDate != null) { triggerBuilder.startAt(startDate); } else { triggerBuilder.startNow(); } if (endDate != null) { triggerBuilder.endAt(endDate); } SimpleTrigger trigger = triggerBuilder.build(); scheduler.scheduleJob(jobDetail, trigger); } } catch (Exception e) { LOGGER.error("Submit job error, name=" + name + " and group=" + group, e); } } public static void interrupt(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { if (scheduler.checkExists(jobKey)) { scheduler.interrupt(jobKey); } } catch (SchedulerException e) { LOGGER.warn("Interrupt job error, name=" + name + " and group=" + group, e); } } public static void deleteJob(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { if (scheduler.checkExists(jobKey)) { scheduler.deleteJob(jobKey); } } catch (SchedulerException e) { LOGGER.warn("Delete job error, name=" + name + " and group=" + group, e); } } public static boolean checkExists(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { return scheduler.checkExists(jobKey); } catch (SchedulerException e) { LOGGER.warn("CheckExists job error, name=" + name + " and group=" + group, e); } return false; } public static Date getNeedFireTime(String cron, Date startDate) { Date nextFireTime1 = getNextFireTime(cron, startDate); Date nextFireTime2 = getNextFireTime(cron, nextFireTime1); int intervals = (int) (nextFireTime2.getTime() - nextFireTime1.getTime()); Date cal1 = DateUtils.addMilliseconds(nextFireTime1, - intervals); Date cal2 = getNextFireTime(cron, cal1); Date cal3 = getNextFireTime(cron, cal2); while (!cal3.equals(nextFireTime1)) { cal1 = DateUtils.addMilliseconds(cal1, - intervals); cal2 = getNextFireTime(cron, cal1); cal3 = getNextFireTime(cron, cal2); if (cal3.before(nextFireTime1)) { intervals = -1000; } } return cal2; } public static Date getNextFireTime(String cron, Date startDate) { return getCronExpression(cron).getTimeAfter(startDate); } private static CronExpression getCronExpression(String cron) { try { return new CronExpression(cron); } catch (ParseException e) { throw new IllegalArgumentException(e); } } } ``` + **SpringContextUtils** 实现ApplicationContextAware的工具类,可以通过其它类引用它以操作spring容器及其中的Bean实例。 ```java import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author lilinchao * @date 2021/9/3 * @description 1.0 */ @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String beanName) throws BeansException { return applicationContext.getBean(beanName); } public static
T getBean(String beanName, Class
clazz) throws BeansException { return applicationContext.getBean(beanName, clazz); } public static
T getBean(Class
clazz) throws BeansException { return applicationContext.getBean(clazz); } public static Object getBean(String beanName, Object... args) throws BeansException { return applicationContext.getBean(beanName, args); } public static
T getBean(Class
clazz, Object... args) throws BeansException { return applicationContext.getBean(clazz, args); } } ``` **说明** Spring容器会检测容器中的所有Bean,如果发现某个Bean实现了ApplicationContextAware接口,Spring容器会在创建该Bean之后,自动调用该Bean的setApplicationContextAware()方法,调用该方法时,会将容器本身作为参数传给该方法——该方法中的实现部分将Spring传入的参数(容器本身)赋给该类对象的applicationContext实例变量,因此接下来可以通过该applicationContext实例变量来访问容器本身。 #### 2.7 运行结果 ![07.SpringBoot集成Quartz实现分布式任务调度02.jpg](https://lilinchao.com/usr/uploads/2021/09/2287616831.jpg) ### 三、项目目录结构 ![07.SpringBoot集成Quartz实现分布式任务调度03.jpg](https://lilinchao.com/usr/uploads/2021/09/3691505737.jpg) *附参考文章链接:* *https://www.jianshu.com/p/ab438d944669* *https://www.cnblogs.com/summerday152/p/14193968.htm* *https://gitee.com/tqbx/springboot-samples-learn/tree/master/spring-boot-quartz*
标签:
Quartz
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1411.html
上一篇
【转载】06.Quartz配置quartz.properties详解
下一篇
01.Yarn基础架构
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
6
标签云
Docker
Python
SQL练习题
容器深入研究
Nacos
FileBeat
Livy
链表
RSA加解密
SpringCloudAlibaba
Java工具类
Hbase
持有对象
Kafka
DataX
MyBatisX
SpringBoot
JavaSE
Thymeleaf
Java编程思想
队列
Spark RDD
Scala
Flink
锁
DataWarehouse
Golang基础
NIO
JVM
ClickHouse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭