SpringBoot整合XXLJob

XXLJob简介

XXLJob是一个分布式任务调度平台,优点:开发迅速、学习简单、轻量级、易扩展。是大众点评员工xxl创建并维护,基于 GPL-3.0 开源,可放心商用,目前已经拥有庞大的使用群体。

简单来说,就是一个定时任务中间件,类似的产品有当当网开源的Elastic-Job。

特性

  • 简单:产品本身基于Java开发的,鉴于JVM的优秀,安装部署集成简单,和其他中间件、框架相比,避免了各种环境问题;同时程序员在项目中集成进来也很简单,有手就行。
  • 触发策略丰富:按照设置的Cron表达式触发、固定间隔触发、固定延时触发、API(事件)触发、人工触发、父子任务触发
  • 支持失败重试
  • 包含简单的告警和丰富的log,玩过定时任务的都知道,可观测性在实际项目中是多么的关键
  • 支持任务分片:例如将一个大任务,拆分为多个小任务,然后分给不同的执行器节点同时执行
  • 数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性
  • 丰富的钩子:各种回调可细粒度观察任务的生命周期
  • 跨语言:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。除此之外,还提供了 “多任务模式”和“httpJobHandler”等其他跨语言方案;
  • 容器化:对云原生支持友好
  • 有webUI管理系统
  • 高可用:支持集群部署,可弹性扩容
  • 自动发现:执行器会周期性自动注册任务, 调度中心将会自动发现注册上来的执行器并将任务分配给执行器同时触发执行

模块

  • 调度中心:管理执行器、任务;查看任务日志、告警、报表;提供一个webUI管理系统,有简单的用户登录账号管理功能;依赖数据库
  • 执行器:执行器是独立的RESTFul服务,一般集成在业务服务中,它会开辟一个9999(默认)端口和调度中心交互

由上可知,XXLJob为C/S架构,调度中心本身可以高可用部署,执行器集成在业务微服务中,当业务微服务多实例部署的时候,执行器也就可以达到分布式和高可用了。

安装调度中心

初始化数据库

调度中心依赖数据库,安装前需先初始化数据库,初始化脚本可从github中获取 https://github.com/xuxueli/xxl-job/tree/master/doc/db

表说明:

  • xxl_job_lock:任务调度锁表
  • xxl_job_group:执行器信息表,维护任务执行器信息
  • xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等
  • xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等
  • xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到
  • xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能
  • xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息
  • xxl_job_user:系统用户表;

配置

  • 运行端口
  • 数据库连接
  • 报警邮箱
  • token:一个串,非必填,设置之后,执行器也需要配置此串,才能和调度中心交互,相当于一个简单的认证
  • 线程池配置

参见官方文档,本文重点放在SpringBoot整合XXLJob。调度中心的部署,尤其是高可用部署,后续单独开篇。

启动

调度中心就是一个SpringBoot程序,以xxljob2.4.0版本为例,其依赖的SpringBoot版本为 2.7.9,所以任何启动SpringBoot 的方式都可以,webui的默认访问地址:http://ip:8080/xxl-job-admin

整合执行器

pom


<dependency>    
    <groupId>com.xuxueli</groupId>    
    <artifactId>xxl-job-core</artifactId>  
    <version>2.4.0</version>
</dependency>


yml



server:
  port: 9009
logging:
  level:
    com.ramble: debug
xxl:
  job:
    admin:
      #调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
      addresses: http://127.0.0.1:8080/xxl-job-admin
      
    #执行器通讯TOKEN [选填]:非空时启用;
    accessToken: 
    executor:
      #执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
      appname: xxljob-demo-service
      #${spring.application.name}
      #执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
      address: ""
      #执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
      ip: ""
      #执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
      port: 0
      ###${server-port}
      #执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
      logpath: ./logs/xxl-job/jobhandler
      #执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
      logretentiondays: 30


XxlJobConfig



@Slf4j
@Configuration
public class XxlJobConfig {
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    @Value("${xxl.job.executor.appname}")
    private String appname;
    @Value("${xxl.job.executor.address}")
    private String address;
    @Value("${xxl.job.executor.ip}")
    private String ip;
    @Value("${xxl.job.executor.port}")
    private int port;
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> start xxl-job config init");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}



启动执行器

如果一切顺利,将在控制台看到如下输出:


2023-10-09 11:36:23.162  INFO 15736 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9999

看到这个,说明执行器配置已生效,执行器已经顺利和调度中心联系上了。

实践

可以简单的将任务分两个步骤,第一在执行器中定义一个任务具体需要干什么,第二在调度中心触发定义的任务

简单的定时任务

在执行器创建任务

在业务微服务中创建


@Slf4j
@Component
public class DemoJob {
    /**
     * 简单的job,调度器
     */
    @XxlJob("job1")
    public void job1() {
        log.debug("do job1");
    }
}


在调度中心创建执行器

  • 对于调度中心来说,执行器可能是多实例的,通过AppName确定为同一个集群
  • 执行器可以手动指定,也可以自动发现

创建成功之后可以在执行器列表看到。图片为编辑页面,所以可以看到已经有机器地址了。

在调度中心创建任务

  • JobHandler:需要和业务微服务中创建的任务名称一致

新增完毕之后启动,如果一切顺利,将可以在业务微服务中看到如下log:

2023-10-09 11:50:33.050 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:38.044 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:44.100 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:48.052 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1
2023-10-09 11:50:53.043 DEBUG 34868 --- [6-1696823413294] com.ramble.xxljob.task.DemoJob           : do job1

每5s执行了一次任务

带前置和后置处理的定时任务

XxlJob注解详解

XxlJob注解有三个参数:
value:JobHandler的名称,需要在执行器和调度中心保持一致
init:定时任务前置处理,仅在定时任务首次运行前执行一次
destory:定时任务后置处理,仅在定时任务销毁的时候执行一次

这里需要注意:

  • 一个定时任务可能反复执行多次,例如我们设置了固定10s执行一次,那么可能每10s就执行一次,但是前置处理和后置处理仅执行一次,前置处理和后置处理仅针对整个任务,而非任务的一次执行
  • 在前置处理中抛异常并不会阻止任务的创建和执行
  • 可以在前置处理和后置处理中访问数据库或者做一些业务逻辑

创建带前(后)置处理的任务



@Slf4j
@Component
public class DemoJob {
    /**
     * 创建带前(后)置处理的任务
     * Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
     * <p>
     * 执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
     * <p>
     * 任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
     */
    @XxlJob(value = "job2", init = "job2Init", destroy = "job2Destroy")
    public void job2() throws InterruptedException {
        LocalDateTime now = LocalDateTime.now();
        XxlJobHelper.log("进入job2,time={}", now.toString());
        log.debug("job2 - doSomething ...");
        Thread.sleep(2000);
        XxlJobHelper.log("离开job2,time={}", now.toString());
    }
    public void job2Init() {
        log.debug("job2Init - doSomething ...");
    }
    public void job2Destroy() {
        log.debug("job2Destroy - doSomething ...");
    }
}


调度中心需要创建对应job2的任务并启动。如果一切顺利将在执行器控制台看到如下log:



2023-10-09 13:31:44.104 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2Init - doSomething ...
2023-10-09 13:31:44.110 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:31:54.052 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:32:04.053 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
2023-10-09 13:32:14.054 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2 - doSomething ...
Disconnected from the target VM, address: '127.0.0.1:50819', transport: 'socket'
2023-10-09 13:32:21.606  INFO 35848 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server stop.
2023-10-09 13:32:21.618  INFO 35848 --- [rRegistryThread] c.x.j.c.thread.ExecutorRegistryThread    : >>>>>>>>>>> xxl-job registry-remove success, registryParam:RegistryParam{registryGroup='EXECUTOR', registryKey='xxljob-demo-service', registryValue='http://192.168.3.191:9999/'}, registryResult:ReturnT [code=200, msg=null, content=null]
2023-10-09 13:32:21.618  INFO 35848 --- [rRegistryThread] c.x.j.c.thread.ExecutorRegistryThread    : >>>>>>>>>>> xxl-job, executor registry thread destroy.
2023-10-09 13:32:21.621  INFO 35848 --- [ionShutdownHook] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server destroy success.
2023-10-09 13:32:21.622 DEBUG 35848 --- [7-1696829504104] com.ramble.xxljob.task.DemoJob           : job2Destroy - doSomething ...
2023-10-09 13:32:21.622  INFO 35848 --- [7-1696829504104] com.xxl.job.core.thread.JobThread        : >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[xxl-job, JobThread-27-1696829504104,10,main]
2023-10-09 13:32:21.623  INFO 35848 --- [FileCleanThread] c.x.j.core.thread.JobLogFileCleanThread  : >>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destroy.

通过log可以观测到:

  • 执行器启动的时候,在任务首次运行前打印了init方法中的log
  • 而后定时任务按照既定执行策略执行
  • 当执行器服务停止的时候,打印了destory方法中的log
  • 通过XxlJobHelper.log 打印的日志,可以在调度中心 调度日志--->操作--->执行日志 中看到

父子任务

当两个任务需要关联触发的时候可以使用父子任务的功能,当然了子任务还可以有子任务。

这种情况只需要启动父任务,不需要启动子任务,当父任务执行成功了,会触发子任务的启动。当父任务执行失败了,不会触发子任务的启动。

父子执行器



 /**
     * 父任务
     */
     @XxlJob("jobFather")
     public void jobFather() {
         // 创建一个新的随机数生成器
         Random random = new Random();
         // 生成一个0到100之间的随机整数
         int randomNumber = random.nextInt(101);
         if (randomNumber % 2 == 0) {
             log.debug("do - jobFather - success");
             XxlJobHelper.handleSuccess();
         } else {
             log.debug("do - jobFather - fail");
             XxlJobHelper.handleFail("调用XxlJobHelper.handleFail,调度中心就任务此任务执行失败");
         }
     }
 
     /**
      * 子任务
      */
     @XxlJob("jobChild")
     public void jobChild() {
         log.debug("do - jobChild");
     }


  • 父、子执行器并没有什么特殊的地方
  • 在调度中心手动关联父、子任务后,父执行器执行成功后就会触发子执行器执行
  • 上述父执行器中通过XxlJobHelper.handleSuccess() 告诉调度中心,此任务执行成功了
  • 上述父执行器中通过XxlJobHelper.handleFail() 告诉调度中心,此任务执行失败了,调度中心将不会触发子任务

关联父子任务

  • 子任务、父任务分别创建
  • 然后编辑父任务,将子任务的id填写到“子任务ID”中,此时就关联上了
  • 启动父任务,不需要启动子任务
  • 子任务的启动交由调度中心触发,当父任务执行成功了,调度中心自然会启动子任务
  • 如果将子任务启动了,那么子任务将拥有两个触发维度,第一是根据子任务自身的调度类型和调度速度触发,第二是调度中心触发

执行器侧log



2023-10-10 09:13:00.276  INFO 19228 --- [       Thread-4] com.xxl.job.core.server.EmbedServer      : >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9999
2023-10-10 09:13:23.549  INFO 19228 --- [Pool-1699379094] c.xxl.job.core.executor.XxlJobExecutor   : >>>>>>>>>>> xxl-job regist JobThread success, jobId:29, handler:com.xxl.job.core.handler.impl.MethodJobHandler@2b43f314[class com.ramble.xxljob.task.DemoJob#jobFather]
2023-10-10 09:13:23.552 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:28.495 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:34.500 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - success
2023-10-10 09:13:34.511  INFO 19228 --- [Pool-1699379094] c.xxl.job.core.executor.XxlJobExecutor   : >>>>>>>>>>> xxl-job regist JobThread success, jobId:30, handler:com.xxl.job.core.handler.impl.MethodJobHandler@7e3d2ebd[class com.ramble.xxljob.task.DemoJob#jobChild]
2023-10-10 09:13:34.512 DEBUG 19228 --- [0-1696900414511] com.ramble.xxljob.task.DemoJob           : do - jobChild
2023-10-10 09:13:38.494 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - fail
2023-10-10 09:13:43.526 DEBUG 19228 --- [9-1696900403549] com.ramble.xxljob.task.DemoJob           : do - jobFather - success
2023-10-10 09:13:43.539 DEBUG 19228 --- [0-1696900414511] com.ramble.xxljob.task.DemoJob           : do - jobChild

通过日志可以观察到:

  • 一开始仅将父任务的 JobThread注册到了调度中心
  • 而后父任务按照调度速度执行
  • 当父任务执行失败没有触发子任务执行
  • 当父任务执行成功首先将子任务的 JobThread 注册到了调度中心,随即执行了子任务

调度中心-任务管理详解

执行器

任务需要绑定到执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置

路由策略

当执行器集群部署时,提供丰富的路由策略,包括:

  • FIRST(第一个):固定选择第一个机器;
  • LAST(最后一个):固定选择最后一个机器;
  • ROUND(轮询);
  • RANDOM(随机):随机选择在线的机器;
  • CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  • LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  • LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
  • FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  • BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  • SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

调度过期策略

  • 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间
  • 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间

阻塞处理策略

调度过于密集执行器来不及处理时的处理策略

  • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
  • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
  • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本次调度任务

超时和重试

  • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务
  • 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试

引用

技术交流QQ群:1158377441
欢迎关注我的微信公众号,后续博文将在公众号首发: