Spring Boot + MyBatis+Oracle +Elastic-job 实现定时任务功能封住分析

虾米哥 阅读:283 2021-03-31 13:28:09 评论:0

Elastic-Job 简介:

Elastic-Job是当当开源的一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。

本文重点讲解Elastic-Job-lite与Spring Boot整合

第一步:Spring Boot 添加Elastic-Job-lite 依赖jar 包文件

     <!-- 集成elastic-job 分布式定时任务  --> 
		 <dependency> 
            <groupId>com.dangdang</groupId> 
            <artifactId>elastic-job-lite-spring</artifactId> 
            <version>2.1.5</version> 
        </dependency> 

第二步:自定义Elastic-Job 读取配置类(application.properties配置Elastic-job-lite 依赖的zookeeper 分布式调度服务器参数)

 
# elastic-job定时任务配置参数 
zookeeper.serverLists=127.0.0.1:2181 
zookeeper.namespace=springboot_elasticjob 
package com.zzg.common.elastic.job.config; 
 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
 
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; 
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; 
 
@Configuration 
public class ElasticRegCenterConfig { 
 
	// zookeeper地址信息 
	@Value("${zookeeper.serverLists}") 
	private String serverList; 
	// 命名空间 
	@Value("${zookeeper.namespace}") 
	private String namespace; 
 
	@Bean(initMethod = "init") 
	public ZookeeperRegistryCenter regCenter() { 
		ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace); 
		return new ZookeeperRegistryCenter(zookeeperConfiguration); 
	} 
 
} 

第三步:初始化Elastic-Job分布式定时任务初始化和分布式任务监听器,基于Java 注解(Annotation)

package com.zzg.common.elastic.job.init; 
 
import javax.annotation.PostConstruct; 
import org.apache.commons.lang3.StringUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.ApplicationContext; 
import org.springframework.context.annotation.Configuration; 
import com.dangdang.ddframe.job.api.simple.SimpleJob; 
import com.dangdang.ddframe.job.config.JobCoreConfiguration; 
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; 
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; 
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; 
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; 
import com.zzg.common.elastic.job.annotation.ElasticSchedulerParam; 
import com.zzg.common.elastic.job.listener.MyElasticJobListener; 
/** 
 * 分布式任务实例化 
 * @author zzg 
 * 
 */ 
@Configuration 
public class ElasticSchedulerInit { 
	// 日志记录 
	public static final Logger log = LoggerFactory.getLogger(ElasticSchedulerInit.class); 
 
	@Autowired 
	private ZookeeperRegistryCenter regCenter; 
 
	@Autowired 
	private ApplicationContext applicationContext; 
 
	@PostConstruct 
	public void startSimpleJob() { 
		applicationContext.getBeansWithAnnotation(ElasticSchedulerParam.class).forEach((className, obj) -> { 
			ElasticSchedulerParam config = obj.getClass().getAnnotation(ElasticSchedulerParam.class); 
			String cron = StringUtils.defaultIfBlank(config.cron(), config.value()); 
			int shardingTotalCount = config.shardingTotalCount(); 
			String shardingItemParameters = config.shardingItemParameters(); 
			MyElasticJobListener elasticJobListener = new MyElasticJobListener(); 
			SimpleJob simpleJob = (SimpleJob) obj; 
			new SpringJobScheduler(simpleJob, regCenter, 
					getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), 
					elasticJobListener).init(); 
		}); 
	} 
 
	// 私有方法 
	private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, 
			final int shardingTotalCount, final String shardingItemParameters) { 
		return LiteJobConfiguration 
				.newBuilder( 
						new SimpleJobConfiguration( 
								JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount) 
										.shardingItemParameters(shardingItemParameters).build(), 
								jobClass.getCanonicalName())) 
				.overwrite(true).build(); 
	} 
 
} 
package com.zzg.common.elastic.job.annotation; 
 
import java.lang.annotation.Documented; 
import java.lang.annotation.ElementType; 
import java.lang.annotation.Inherited; 
import java.lang.annotation.Retention; 
import java.lang.annotation.RetentionPolicy; 
import java.lang.annotation.Target; 
 
import org.springframework.core.annotation.AliasFor; 
 
/** 
 * 分布式定时任务注解 
 *  
 * @author zzg 
 * 
 */ 
 
@Target({ ElementType.TYPE }) 
@Retention(RetentionPolicy.RUNTIME) 
@Inherited 
@Documented 
public @interface ElasticSchedulerParam { 
	@AliasFor("value") 
	String cron() default ""; 
 
	@AliasFor("cron") 
	String value() default ""; 
 
	int shardingTotalCount() default 1; 
 
	String shardingItemParameters() default "0=0"; 
} 
package com.zzg.common.elastic.job.listener; 
 
import java.text.SimpleDateFormat; 
import java.util.Date; 
 
import org.apache.commons.lang3.StringUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import com.dangdang.ddframe.job.executor.ShardingContexts; 
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; 
 
/** 
 * 自定义ElasticJob 定时器 
 * @author zzg 
 * 
 */ 
public class MyElasticJobListener implements ElasticJobListener { 
	private static final Logger logger = LoggerFactory.getLogger(MyElasticJobListener.class); 
 
	/** 
	 * 长日期格式 
	 */ 
	public static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; 
	private long beginTime = 0; 
 
	@Override 
	public void beforeJobExecuted(ShardingContexts shardingContexts) { 
		beginTime = System.currentTimeMillis(); 
 
		logger.info("===>{} JOB BEGIN TIME: {} <===", shardingContexts.getJobName(), beginTime); 
	} 
 
	@Override 
	public void afterJobExecuted(ShardingContexts shardingContexts) { 
		long endTime = System.currentTimeMillis(); 
		logger.info("===>{} JOB END TIME: {},TOTAL CAST: {} <===", shardingContexts.getJobName(), endTime, 
				endTime - beginTime); 
	} 
 
	/** 
	 * 将长整型数字转换为日期格式的字符串 
	 * 
	 * @param time 
	 * @param format 
	 * @return 
	 */ 
	public static String convert2String(long time, String format) { 
		if (time > 0l) { 
			if (StringUtils.isBlank(format)) 
				format = TIME_FORMAT; 
 
			SimpleDateFormat sf = new SimpleDateFormat(format); 
			Date date = new Date(time); 
 
			return sf.format(date); 
		} 
		return ""; 
	} 
 
} 

第四步:编写一个简单的分布式定时任务Demo

package com.zzg.elastic.job.instance; 
 
import java.util.Date; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.stereotype.Component; 
import com.dangdang.ddframe.job.api.ShardingContext; 
import com.dangdang.ddframe.job.api.simple.SimpleJob; 
import com.zzg.common.elastic.job.annotation.ElasticSchedulerParam; 
 
@ElasticSchedulerParam(cron = "*/30 * * * * ?", shardingTotalCount = 1, shardingItemParameters = "0=Core") 
@Component 
public class SimpleJobDemo implements SimpleJob { 
	private static final Logger log = LoggerFactory.getLogger(SimpleJobDemo.class); 
 
 
	@Override 
	public void execute(ShardingContext shardingContext) { 
		// TODO Auto-generated method stub 
		System.out.println(new Date() + "SimpleJobDemo"); 
		try { 
			Thread.sleep(10000); 
			System.out.println("任务 SimpleJobDemo 开始干活。。。"); 
		} catch (InterruptedException e) { 
			e.printStackTrace(); 
			log.error("SimpleJobDemo 定时任务报错:" + e.getMessage()); 
		} 
		System.out.println(new Date() + "SimpleJobDemo 任务结束------------------------------------"); 
	} 
} 

定时任务运行效果截图:

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
KIKK导航

KIKK导航

排行榜
关注我们

一个IT知识分享的公众号