Spring Boot + MyBatis+Oracle +Elastic-job 实现定时任务功能封住
Elastic-Job 简介:
Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。
本文重点讲解Elastic-Job-lite与Spring Boot整合
第一步:Spring Boot 添加Elastic-Job-lite 依赖jar 包文件
<!-- 集成elastic-job 分布式定时任务 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
第二步:自定义Elastic-Job 读取配置类(application.properties配置Elastic-job-lite 依赖的zookeeper 分布式调度服务器参数)
# elastic-job定时任务配置参数
zookeeper.serverLists=127.0.0.1:2181
zookeeper.namespace=springboot_elasticjob
package com.zzg.common.elastic.job.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
@Configuration
public class ElasticRegCenterConfig {
// zookeeper地址信息
@Value("${zookeeper.serverLists}")
private String serverList;
// 命名空间
@Value("${zookeeper.namespace}")
private String namespace;
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
return new ZookeeperRegistryCenter(zookeeperConfiguration);
}
}
第三步:初始化Elastic-Job分布式定时任务初始化和分布式任务监听器,基于Java 注解(Annotation)
package com.zzg.common.elastic.job.init;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.zzg.common.elastic.job.annotation.ElasticSchedulerParam;
import com.zzg.common.elastic.job.listener.MyElasticJobListener;
/**
* 分布式任务实例化
* @author zzg
*
*/
@Configuration
public class ElasticSchedulerInit {
// 日志记录
public static final Logger log = LoggerFactory.getLogger(ElasticSchedulerInit.class);
@Autowired
private ZookeeperRegistryCenter regCenter;
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
public void startSimpleJob() {
applicationContext.getBeansWithAnnotation(ElasticSchedulerParam.class).forEach((className, obj) -> {
ElasticSchedulerParam config = obj.getClass().getAnnotation(ElasticSchedulerParam.class);
String cron = StringUtils.defaultIfBlank(config.cron(), config.value());
int shardingTotalCount = config.shardingTotalCount();
String shardingItemParameters = config.shardingItemParameters();
MyElasticJobListener elasticJobListener = new MyElasticJobListener();
SimpleJob simpleJob = (SimpleJob) obj;
new SpringJobScheduler(simpleJob, regCenter,
getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters),
elasticJobListener).init();
});
}
// 私有方法
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration
.newBuilder(
new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build(),
jobClass.getCanonicalName()))
.overwrite(true).build();
}
}
package com.zzg.common.elastic.job.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
/**
* 分布式定时任务注解
*
* @author zzg
*
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface ElasticSchedulerParam {
@AliasFor("value")
String cron() default "";
@AliasFor("cron")
String value() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "0=0";
}
package com.zzg.common.elastic.job.listener;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
/**
* 自定义ElasticJob 定时器
* @author zzg
*
*/
public class MyElasticJobListener implements ElasticJobListener {
private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class);
/**
* 长日期格式
*/
public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
private long beginTime = 0;
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
beginTime = System.currentTimeMillis();
logger.info("===>{} JOB BEGIN TIME: {} <===", shardingContexts.getJobName(), beginTime);
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
long endTime = System.currentTimeMillis();
logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===", shardingContexts.getJobName(), endTime,
endTime - beginTime);
}
/**
* 将长整型数字转换为日期格式的字符串
*
* @param time
* @param format
* @return
*/
public static String convert2String(long time, String format) {
if (time > 0l) {
if (StringUtils.isBlank(format))
format = TIME_FORMAT;
SimpleDateFormat sf = new SimpleDateFormat(format);
Date date = new Date(time);
return sf.format(date);
}
return "";
}
}
第四步:编写一个简单的分布式定时任务Demo
package com.zzg.elastic.job.instance;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.zzg.common.elastic.job.annotation.ElasticSchedulerParam;
@ElasticSchedulerParam(cron = "*/30 * * * * ?", shardingTotalCount = 1, shardingItemParameters = "0=Core")
@Component
public class SimpleJobDemo implements SimpleJob {
private static final Logger log = LoggerFactory.getLogger(SimpleJobDemo.class);
@Override
public void execute(ShardingContext shardingContext) {
// TODO Auto-generated method stub
System.out.println(new Date() + "SimpleJobDemo");
try {
Thread.sleep(10000);
System.out.println("任务 SimpleJobDemo 开始干活。。。");
} catch (InterruptedException e) {
e.printStackTrace();
log.error("SimpleJobDemo 定时任务报错:" + e.getMessage());
}
System.out.println(new Date() + "SimpleJobDemo 任务结束------------------------------------");
}
}
定时任务运行效果截图:
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。