一次阿里云Schedulerx换成Spring定时任务的过程
* 手动定时任务处理调用阿里云定时器 {@link com.suyun.vehicle.conf.SpringScheduleConfig}*
·
1、当前现状
- 所有的任务已经迁移到阿里云Schedulerx;
- 阿里云Schedulerx是按照调用次数收费,有些任务每秒调用一次,费用太高;
2、明确需求
- 需要把执行非常频繁的定时任务从阿里云迁移(阿里云收费根据调用次数,且可以针对单个任务设置是否启动定时调度);
- 服务是集群部署,存在广播模式和单机模式,大部分是单机模式;
- 有时任务失败,需要人工手动触发;
- 对新增的定时任务,不想在阿里云控制台过多配置,但又需要手动触发;
3、实现思路
保留原有阿里云定时任务,禁用执行非常频繁的手动任务,禁用后不会自动执行,但可以手动触发;
禁用的任务使用Spring的定时调度工具,(引入开源的调度工具比较麻烦);
需要考虑在集群环境下,广播和单机模式的实现,引入Redis锁机制;
可以定义一个任务,用来调起真正执行的认为,也使用手动触发模式;
4、注意事项
集群环境,定义了一个任务执行周期 3分钟一次,怎么样保证都是在3分钟执行一次,机器之间并不知道别的机器是否已经执行过了任务
每个任务,用一台机器作为任务分发机器,除非宕机,重新选择;
- 添加一个调度器,30s执行一次,一直往redis添加IP信息,设置3分钟过期,用于判断服务是否可用,超过3分钟没有更新数据,判断当前IP服务已宕机;
- 分发任务IP:在执行任务之前,setIf(当前任务,IP),如果能设置成功,设置失效时间大于当前调度下次执行时间30s后,如果不能设置,查回IP,如果IP相同,更新任务IP;
- 如果不是分发任务IP,直接完成,否则,查询活跃IP,之后根据广播通知方式,指定IP等策略获取有效的执行任务IP;
- 如果当前有效IP只有一个,并且等于分发IP,直接执行;
- IP多个或不同,直接发出redis通知,执行完成;
- Redis消息订阅,直接通过beanName获取对象,调用单独执行逻辑方法;
package com.suyun.vehicle.conf;
import com.google.common.collect.Maps;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.Task;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.util.CollectionUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description:
* <p>
*
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 14:06
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@EnableScheduling
@org.springframework.context.annotation.Configuration("springScheduleConfig")
public class SpringScheduleConfig implements SchedulingConfigurer, ApplicationListener, ApplicationContextAware {
@Value("${scheduler.task.size:8}")
private int taskSize;
private static ScheduledTaskRegistrar TASK_REGISTRAR;
private static ApplicationContext applicationContext;
private static final Map<String, Map<String, CronTask>> CLASS_NAME_METHOD_NAME_CRON_TASK_MAP = Maps.newHashMap();
/**
* <p>注入线程池</p>
* <p>{@link ScheduledAnnotationBeanPostProcessor}实现Spring定时器</p>
* <p>ScheduledAnnotationBeanPostProcessor->MergedBeanDefinitionPostProcessor->BeanPostProcessor扫描注解@Schedule,生成任务</p>
* <p>ScheduledAnnotationBeanPostProcessor->SmartInitializingSingleton和ApplicationListener接口,Spring Bean注入,还未发出通知之前afterSingletonsInstantiated执行一次,Spring 通知执行一次</p>
* <p>1、默认使用一个线程执行,如果多个任务都是每秒执行一次获取有任务执行时间过长,会造成任务延迟,严重直接阻塞任务;</p>
* <p>2、重写configureTasks方法,注入线程池</p>
* <p>3、异步执行需要开启@EnableAsync异步注解,方法上添加@Async注解</p>
* <p>4、ScheduledThreadPoolExecutor.ScheduledFutureTask方法,首选判断任务是否执行,不管手动还是定时,如果是,跳过这次执行,2判断是否非周期调用,直接执行,如果周期调用,执行,如果call()抛出异常,不在重新设置下次任务执行时间,定时任务不在执行</p>
*
* @param taskRegistrar
*/
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(taskScheduler());
addFirstCron(taskRegistrar);
TASK_REGISTRAR = taskRegistrar;
}
/**
* <p>{@link ScheduledThreadPoolExecutor}和{@link ThreadPoolTaskScheduler}只能使用一个,IOC容器互斥</p>
* <p>ScheduledThreadPoolExecutor只能使用设置固定核心线程长度的线程池,灵活性不足</p>
* <p>ThreadPoolTaskScheduler实例化Bean时,实现了InitializingBean接口,实现了异步调度接口</p>
* <p>最终执行任务的线程池都为{@link java.util.concurrent.ScheduledExecutorService}实现的{@link ScheduledThreadPoolExecutor}实例</p>
*
* @return
*/
@Bean
@DependsOn(value = {"springScheduleHelper"})
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 配置线程池大小,根据任务数量定制
taskScheduler.setPoolSize(taskSize);
// 线程名称前缀
taskScheduler.setThreadNamePrefix("suyun-ThreadPoolTaskScheduler-thread-");
// 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
// 线程池关闭前最大等待时间,确保最后一定关闭,线程任务最大执行时间30分钟
taskScheduler.setAwaitTerminationSeconds(1800);
// 线程池关闭时等待所有任务完成
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
// 任务拒绝策略 任务->判断核心线程是否存在空闲(直接执行)->判断队列是否已满(已满拒绝,否则放入队列)->判断是否达到最大线程(没有,创建执行,否则返回,等待可用线程)
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return taskScheduler;
}
/**
* 预计下次执行时间
*
* @param className
*/
public static Long nextTime(String className) {
if (!className.contains(":")) {
return nextTime(className, null);
}
String[] classNameMethodNames = className.split(":");
return nextTime(classNameMethodNames[0], classNameMethodNames[1]);
}
/**
* 预计下次执行时间
*
* @param className 对象名称类名
* @param methodName 执行的方法名,如果类中只有一个@Scheduler,则方法名可以为空
*/
public static Long nextTime(String className, String methodName) {
CronTask cronTask = getCronTask(className, methodName);
if (cronTask == null) {
return null;
}
return cronTask.getTrigger().nextExecutionTime(new SimpleTriggerContext()).getTime();
}
/**
* 获取任务队列
*
* @param className
* @param methodName
* @return
*/
private static CronTask getCronTask(String className, String methodName) {
LinkedHashMap<String, CronTask> methodNameCronTaskMap = (LinkedHashMap) CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(className);
if (CollectionUtils.isEmpty(methodNameCronTaskMap)) {
return null;
}
if (methodNameCronTaskMap.size() == 1) {
for (Map.Entry<String, CronTask> methodNameCronTaskEntry : methodNameCronTaskMap.entrySet()) {
return methodNameCronTaskEntry.getValue();
}
}
if (StringUtils.isEmpty(methodName)) {
return null;
}
return methodNameCronTaskMap.get(methodName);
}
/**
* 根据名称获取容器对象信息
*
* @param name
* @param <T>
* @return
*/
public static <T> T getBean(String name) {
return (T) applicationContext.getBean(name);
}
/**
* 根据类型获取容器对象信息
*
* @param requiredType
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
/**
* 根据名称类型获取容器对象信息
*
* @param requiredType
* @param <T>
* @return
*/
public static <T> T getBean(String name, Class<T> requiredType) {
return applicationContext.getBean(name, requiredType);
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TASK_REGISTRAR == null || CollectionUtils.isEmpty(TASK_REGISTRAR.getCronTaskList())) {
return;
}
ScheduledMethodRunnable scheduledMethodRunnable = null;
Map<String, CronTask> methodNameCronTaskMap = null;
String className = null, methodName = null;
for (CronTask cronTask : TASK_REGISTRAR.getCronTaskList()) {
scheduledMethodRunnable = (ScheduledMethodRunnable) cronTask.getRunnable();
className = scheduledMethodRunnable.getTarget().getClass().getName();
methodNameCronTaskMap = CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(className);
if (methodNameCronTaskMap == null) {
methodNameCronTaskMap = Maps.newLinkedHashMap();
CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.put(className, methodNameCronTaskMap);
}
methodName = scheduledMethodRunnable.getMethod().getName();
methodNameCronTaskMap.put(methodName, cronTask);
}
}
/**
* 手动添加{@link SpringScheduleHelper}任务进入定时器
* SpringScheduleHelper bean注入在configureTasks方法之后,需要手动注入对象
* @param taskRegistrar
*/
private void addFirstCron(ScheduledTaskRegistrar taskRegistrar) {
Map<String, CronTask> methodNameCronTaskMap;
//为初始scheduleReportServerStatusCronTask对象,手动添加到定时任务中
ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor = applicationContext.getBean(ScheduledAnnotationBeanPostProcessor.class);
SpringScheduleHelper springScheduleHelper = applicationContext.getBean(SpringScheduleHelper.class);
if (scheduledAnnotationBeanPostProcessor == null || springScheduleHelper == null) {
return;
}
scheduledAnnotationBeanPostProcessor.postProcessAfterInitialization(springScheduleHelper, "springScheduleHelper");
Set<ScheduledTask> scheduledTaskSet = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
Task task = null;
ScheduledMethodRunnable scheduledMethodRunnableValue = null;
for (ScheduledTask scheduledTask : scheduledTaskSet) {
task = scheduledTask.getTask();
if (!(task instanceof CronTask)) {
continue;
}
scheduledMethodRunnableValue = (ScheduledMethodRunnable) task.getRunnable();
if (!scheduledMethodRunnableValue.getTarget().getClass().getName().equals(SpringScheduleHelper.class.getName())) {
continue;
}
CronTask cronTask = (CronTask) task;
//手动注入注册服务定时任务
taskRegistrar.addCronTask(cronTask);
//手动触发一次
taskRegistrar.scheduleCronTask(cronTask);
methodNameCronTaskMap = CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(SpringScheduleHelper.class.getName());
if (methodNameCronTaskMap == null) {
methodNameCronTaskMap = Maps.newLinkedHashMap();
CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.put(SpringScheduleHelper.class.getName(), methodNameCronTaskMap);
}
methodNameCronTaskMap.put(scheduledMethodRunnableValue.getMethod().getName(), cronTask);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringScheduleConfig.applicationContext = applicationContext;
}
}
5.2、定时任务帮助类
package com.suyun.vehicle.helper;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.suyun.vehicle.conf.SpringScheduleConfig;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.service.ManualTaskService;
import com.suyun.vehicle.utils.NetworkUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Description:
* <p>
*
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/6 19:41
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Component
public class SpringScheduleHelper implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringScheduleHelper.class);
private static final Random RANDOM = new Random();
/**
* 注册服务前缀RedisKey
*/
private static final String SERVER_REGISTER_PREFIX = "server:register:";
/**
* 分发机广播通知通道
*/
private static final String SCHEDULE_CHANNEL_NAME = "schedule:channel:name:";
/**
* 任务分发服务器,一个任务尽可能用一台服务器分发任务
*/
private static final String SCHEDULE_DISTRIBUTE_TASK_SERVER = "schedule:distribute:task:server";
/**
* 当前正在执行任务的服务器
*/
private static final String SCHEDULE_EXECUTE_SERVER = "schedule:execute:task:server";
@Resource(name = "redisTemplate")
private RedisTemplate<String, String> redisTemplate;
@Value("${schedulerx.is.test.model:true}")
private Boolean testModel;
@Value("${schedulerx.test.endpoint:}")
private String testEndpoint;
@Value("${schedulerx.group.id:}")
private String groupId;
/**
* <p>定时器实时上报服务器状态信息,每30S上报一次,在 30 * 6 = 180=3分钟,如果3分钟一直没有成功,表示该服务器不可用,剔除执行列表</p>
* <p>初始化手动触发一次,不直接触发,那么定时任务时间执行周期需要大于30s</p>
*/
@Scheduled(cron = "0/30 * * * * ?")
public void scheduleRegisterServerStatus() {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("开始注册机器 groupId:{} IP:{}", groupId, NetworkUtil.IP);
}
redisTemplate.opsForValue().set(registerIpKey(NetworkUtil.IP), "更新时间:" + System.currentTimeMillis(), 3, TimeUnit.MINUTES);
} catch (Exception e) {
}
}
/**
* 注册机Key
*
* @param IP
* @return
*/
private String registerIpKey(String IP) {
return createRedisKey(SERVER_REGISTER_PREFIX, IP);
}
/**
* 根据任务名称和方法获取分发机器IP
*
* @param taskKey
* @return
*/
private String distributeIpKey(String taskKey) {
return createRedisKey(SCHEDULE_DISTRIBUTE_TASK_SERVER, taskKey);
}
/**
* 执行任务IP
*
* @param IP
* @return
*/
private String executeIpKey(String IP) {
return createRedisKey(SCHEDULE_EXECUTE_SERVER, IP);
}
private String createRedisKey(String prefix, String IP) {
return prefix + groupId + ":" + Optional.ofNullable(IP).orElse("");
}
/**
* 根据GroupId获取有效的注册了的服务器信息(可以执行任务)
* 1、groupId为空||没有注册的机器,表示任务不能执行
* 2、如果只有一台机器活跃,直接选择当前机器(尽可能保证高可用)
* 3、如果是广播
*
* @param manualTaskDto - broadcast 是否广播模式 广播模式,所有的机器都执行/单机模式,随机选择一台机器执行
* - fixedMachine 固定机器执行
* @return
*/
public List<String> getActiveServer(ManualTaskDto manualTaskDto) {
if (StringUtils.isEmpty(groupId)) {
return Collections.EMPTY_LIST;
}
Set<String> keys = redisTemplate.keys(registerIpKey("*"));
if (CollectionUtils.isEmpty(keys)) {
return Collections.EMPTY_LIST;
}
List<String> registerIpList = keys.parallelStream().map(key -> {
return key.replace(registerIpKey(""), "");
}).collect(Collectors.toList());
if (registerIpList.size() == 1 || manualTaskDto.getBroadcast()) {
//如果只有一台注册成功的机器,或者是广播模式,直接选择当前机器
return registerIpList;
}
if (StringUtils.isNotEmpty(manualTaskDto.getIp())) {
//指定IP模式,如果IP注册机存在直接返回
if (registerIpList.contains(manualTaskDto.getIp())) {
return Lists.newArrayList(manualTaskDto.getIp());
}
if (!manualTaskDto.getHighAvailability()) {
throw new RuntimeException("非高可用模式,指定了IP {},但IP并未注册" + manualTaskDto.getIp());
}
}
int index = 0;
if (manualTaskDto.getFixedMachine()) {
//如果固定IP模式,则直接根据任务Hash获取具体执行的IP
index = Hashing.consistentHash(manualTaskDto.getTaskKey().hashCode(), registerIpList.size());
return Lists.newArrayList(registerIpList.get(index));
}
//如果是随机模式,获取当前正在执行任务的的服务器IP,尽可能不使用当前机器,尽可能防止同一台机器同时执行多个定时任务,造成单机压力过大
Set<String> executeServerKey = redisTemplate.keys(executeIpKey("*"));
if (!CollectionUtils.isEmpty(executeServerKey)) {
//如果存在当前正在执行的服务器,尽可能排除它,防止同一台机器同时执行多个任务
Set<String> executeIpSet = executeServerKey.parallelStream().map(executeIp -> executeIp.replace(executeIpKey(""), "")).collect(Collectors.toSet());
List<String> noExecuteIpList = Lists.newArrayList();
for (String registerIp : registerIpList) {
if (executeIpSet.contains(registerIp)) {
continue;
}
noExecuteIpList.add(registerIp);
}
if (!CollectionUtils.isEmpty(noExecuteIpList)) {
index = RANDOM.nextInt(noExecuteIpList.size());
return Lists.newArrayList(noExecuteIpList.get(index));
}
}
//随机一个大于等于[0,keys.size())的int,不包括后面的数量
index = RANDOM.nextInt(keys.size());
return Lists.newArrayList(registerIpList.get(index));
}
public String sendKey() {
return SCHEDULE_CHANNEL_NAME + ":" + groupId;
}
/**
* <p>发出通知,对多台机器执行任务</p>
* <p>如果服务已使用redis消息订阅</p>
*
* @param beanName
* @param executeIpList
* @param jobParameters
*/
public void send(String beanName, List<String> executeIpList, String jobParameters) {
//消息订阅通知模式
try {
ManualTaskDto manualTaskDto = ManualTaskDto.builder().beanName(beanName).jobParameters(jobParameters).executeIpList(executeIpList).groupId(groupId);
redisTemplate.convertAndSend(sendKey(), JSON.toJSONString(manualTaskDto));
} catch (Exception e) {
LOGGER.warn("发送通知失败", e);
}
}
/**
* 是否测试环境,生产环境阿里无需配置endpoint,测试环境需要配置
*
* @return
*/
public boolean testEnv() {
if (!testModel) {
//如果设置生产环境,直接返回false,不是测试环境,否则,判断是否设置域名信息
return false;
}
return StringUtils.isNotEmpty(testEndpoint);
}
public String getGroupId() {
return groupId;
}
/**
* 是否分发任务IP,如果不是分发任务IP,无需执行任务
*
* @return
*/
public boolean distributeTaskServer(String taskKey) {
//如果下次任务执行时间为空,则表示任务没有注册
Long nextTime = SpringScheduleConfig.nextTime(taskKey);
if (nextTime == null) {
return false;
}
String key = distributeIpKey(taskKey);
Boolean trySetFlag = redisTemplate.opsForValue().setIfAbsent(key, NetworkUtil.IP);
if (trySetFlag) {
//如果尝试把自己设置为分发服务成功,则该机器分发当前任务,做定时处理,并对注册机设置失效时间,防止机器宕机,别的服务器不能使用
redisTemplate.expire(key, 3, TimeUnit.MINUTES);
return true;
}
//如果设置失败,表示已经有机器做了注册机
String distribute = redisTemplate.opsForValue().get(key);
if (NetworkUtil.IP.equals(distribute)) {
//如果IP相同,表示第二次调用,直接更新失效时间
redisTemplate.opsForValue().set(key, NetworkUtil.IP, 3, TimeUnit.MINUTES);
return true;
}
return false;
}
/**
* 消费消息
*
* @param message
* @param bytes
*/
@Override
public void onMessage(Message message, byte[] bytes) {
if (message == null || message.getBody() == null || message.getBody().length <= 0) {
return;
}
RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
ManualTaskDto manualTaskDto = null;
try {
manualTaskDto = JSON.parseObject((String) serializer.deserialize(message.getBody()), ManualTaskDto.class);
} catch (SerializationException e) {
LOGGER.info("序列化失败 message: {}", message, e);
return;
}
if (CollectionUtils.isEmpty(manualTaskDto.getExecuteIpList()) || !manualTaskDto.getExecuteIpList().contains(NetworkUtil.IP)) {
return;
}
ManualTaskService.SubManualTaskService subManualTaskService = SpringScheduleConfig.getBean(manualTaskDto.getBeanName(), ManualTaskService.SubManualTaskService.class);
if (subManualTaskService == null) {
return;
}
//如果当前服务器IP是需要执行任务的IP,那么直接执行
final String jobParameters = manualTaskDto.getJobParameters();
executeServer(manualTaskDto, () -> {
return subManualTaskService.publishExecute(jobParameters);
}, System.currentTimeMillis());
}
/**
* 执行任务,并且保存正在执行的机器IP,下次选择是尽可能不选择正在执行任务的IP,防止时间过长
*
* @param manualTaskDto
* @param supplier
* @param beginTime
* @return
*/
public Boolean executeServer(ManualTaskDto manualTaskDto, Supplier<Boolean> supplier, long beginTime) {
Boolean r = null;
String executeServerKey = executeIpKey(NetworkUtil.IP);
try {
/**@Scheduled 任务必须抓取异常,否则阻塞任务队列,任务不在执行{@link com.suyun.vehicle.conf.SpringScheduleConfig}**/
boolean executeFlag = redisTemplate.opsForValue().setIfAbsent(executeServerKey, manualTaskDto.getTaskKey());
if (executeFlag) {
//执行任务机器最多90s不能通过随机直接选择为执行及,除非所有的机器都是正在执行任务
redisTemplate.expire(executeServerKey, 90, TimeUnit.SECONDS);
}
Long bussinessTime = System.currentTimeMillis();
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("开始执行用户定义的业务逻辑 groupId: {}", groupId);
}
r = supplier.get();
if (r == null) {
return true;
}
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("结束执行用户定义的业务逻辑 groupId: {} 任务执行时间:{} ms", groupId, System.currentTimeMillis() - bussinessTime);
}
} catch (Throwable e) {
//Throwable异常大于Exception,是Exception的父级,包括了Error等,如调用飞书接口,发送通知失败,会抛出Error,使用Exception就会抓取失败,防止有些异常抓取失败,导致线程阻塞
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().warn("任务执行失败:manualTaskDto: {}", manualTaskDto, e);
}
return false;
} finally {
redisTemplate.delete(executeServerKey);
}
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("结束任务 groupId: {} 任务执行时间:{} ms", groupId, System.currentTimeMillis() - beginTime);
}
return Optional.ofNullable(r).orElse(Boolean.FALSE);
}
}
5.3、参数类信息
package com.suyun.vehicle.dao;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import org.slf4j.Logger;
import java.io.Serializable;
import java.util.List;
/**
* Description:
* <p>
*
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 16:58
* @Email: leo.xiong@suyun360.com
* @Since:
*/
public class ManualTaskDto implements Serializable {
private static final long serialVersionUID = -2638699521129899896L;
/**
* 通知触发执行,否则不执行具体逻辑,默认false,如果设置为true,表示直接执行
*/
private Boolean publishExecute = false;
private SpringScheduleHelper springScheduleHelper;
/**
* 信号过滤
*/
private String groupId;
/**
* 是否高可用,当机器信息不存在是,直接使用分发者作为机器
*/
private Boolean highAvailability = true;
/**
* 定时器对象logger
*/
private Logger logger;
/**
* 对象名称
*/
private String beanName;
/**
* 获取任务的key {@link com.suyun.vehicle.conf.SpringScheduleConfig}
*/
private String taskKey;
/**
* <p>是否广播模式(集群时每台服务器都执行)/单机模式</p>
* <p>默认单机模式</p>
* <p>如果valueOperations不为空,则默认使用单机模式,可以通过后置broadcast方法修改</p>
*/
private Boolean broadcast = false;
/**
* <p>是否某个任务在注册机器未增减的时候使用固定的机器执行,默认不是,使用随机模式,可以自定义实现轮训模式,记录上一次执行的IP {@link com.suyun.vehicle.conf.SpringScheduleConfig}</p>
* <p>使用广播模式,指定机器属性无效</p>
*/
private Boolean fixedMachine = false;
/**
* 直接指定IP模式
*/
private String ip;
/**
* 添加需要执行任务的IP集合
*/
private List<String> executeIpList;
private String jobParameters;
/**
* 是否执行测试环境,默认不执行
*/
private Boolean testExecute = false;
public ManualTaskDto() {
}
/**
* @param springScheduleHelper 执行任务帮助类
*/
private ManualTaskDto(String beanName, SpringScheduleHelper springScheduleHelper) {
this.springScheduleHelper = springScheduleHelper;
this.beanName = beanName;
}
public ManualTaskDto(String beanName, List<String> executeIpList, String jobParameters) {
this.beanName = beanName;
this.executeIpList = executeIpList;
this.jobParameters = jobParameters;
}
public Logger getLogger() {
return logger;
}
public ManualTaskDto logger(Logger logger) {
this.logger = logger;
return this;
}
public ManualTaskDto taskKey(String taskKey) {
this.taskKey = taskKey;
return this;
}
public String getTaskKey() {
return taskKey;
}
public SpringScheduleHelper getSpringScheduleHelper() {
return springScheduleHelper;
}
public static ManualTaskDto builder() {
return new ManualTaskDto(null, null);
}
public static ManualTaskDto builder(SpringScheduleHelper springScheduleHelper) {
return new ManualTaskDto(null, springScheduleHelper);
}
public static ManualTaskDto builder(String beanName, SpringScheduleHelper springScheduleHelper) {
return new ManualTaskDto(beanName, springScheduleHelper);
}
public Boolean getBroadcast() {
return broadcast;
}
public ManualTaskDto broadcast(Boolean broadcast) {
this.broadcast = broadcast;
return this;
}
public Boolean getFixedMachine() {
return fixedMachine;
}
public ManualTaskDto fixedMachine(Boolean fixedMachine) {
this.fixedMachine = fixedMachine;
return this;
}
public String getIp() {
return ip;
}
public ManualTaskDto ip(String ip) {
this.ip = ip;
return this;
}
public Boolean getPublishExecute() {
return publishExecute;
}
public ManualTaskDto publishExecute(Boolean publishExecute) {
this.publishExecute = publishExecute;
return this;
}
public List<String> getExecuteIpList() {
return executeIpList;
}
public ManualTaskDto executeIpList(List<String> executeIpList) {
this.executeIpList = executeIpList;
return this;
}
public String getBeanName() {
return beanName;
}
public ManualTaskDto beanName(String beanName) {
this.beanName = beanName;
return this;
}
public String getJobParameters() {
return jobParameters;
}
public ManualTaskDto jobParameters(String jobParameters) {
this.jobParameters = jobParameters;
return this;
}
public Boolean getTestExecute() {
return testExecute;
}
public ManualTaskDto testExecute(Boolean testExecute) {
this.testExecute = testExecute;
return this;
}
public String getGroupId() {
return groupId;
}
public ManualTaskDto groupId(String groupId) {
this.groupId = groupId;
return this;
}
public Boolean getHighAvailability() {
return highAvailability;
}
public ManualTaskDto setHighAvailability(Boolean highAvailability) {
this.highAvailability = highAvailability;
return this;
}
@Override
public String toString() {
return "ManualTaskDto{" +
"publishExecute=" + publishExecute +
", springScheduleHelper=" + springScheduleHelper +
", groupId='" + groupId + '\'' +
", beanName='" + beanName + '\'' +
", taskKey='" + taskKey + '\'' +
", broadcast=" + broadcast +
", fixedMachine=" + fixedMachine +
", executeIpList=" + executeIpList +
", jobParameters='" + jobParameters + '\'' +
", testExecute=" + testExecute +
'}';
}
}
5.4、定义调度实现接口
package com.suyun.vehicle.service;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.suyun.vehicle.conf.SpringScheduleConfig;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import com.suyun.vehicle.utils.NetworkUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Description:
* <p>
* 手动定时任务处理调用阿里云定时器 {@link com.suyun.vehicle.conf.SpringScheduleConfig}
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/4 9:56
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@FunctionalInterface
public interface ManualTaskService {
String JOB_NAME = "jobInstance";
String PARAM = "param";
/**
* 手动任务执行前做一定的校验->获取对应执行任务的对象,手动执行一次
*
* @param instanceParameters
* @return
* @throws Exception
*/
Object manualProcess(String instanceParameters) throws Exception;
/**
* 校验参数
*
* @param instanceParameters
* @return
*/
default Object validParam(String instanceParameters) {
if (StringUtils.isEmpty(instanceParameters)) {
return "执行任务参数不能为空";
}
if (!instanceParameters.contains("{") || !instanceParameters.contains("}")) {
return "执行任务参数不是JSON字符串";
}
Map<String, String> keyValue = null;
try {
keyValue = JSON.parseObject(instanceParameters, Map.class);
} catch (Exception e) {
return "执行任务参数不是JSON字符串 instanceParameters:" + instanceParameters;
}
if (!keyValue.containsKey(JOB_NAME)) {
return "任务实例不能为空 instanceParameters:" + instanceParameters;
}
return keyValue;
}
/**
* 子任务执行
*/
interface SubManualTaskService {
/**
* 定时执行 @Scheduled
*
* @return
*/
void process();
/**
* 无参执行
*
* @return
*/
default Object processing() {
return processing(null);
}
/**
* 根据参数执行任务
*
* @param jobParameters
* @return
*/
Object processing(String jobParameters);
/**
* Redis消息订阅执行方法
*
* @param jobParameters
* @return
*/
Boolean publishExecute(String jobParameters);
/**
* 开始执行定时任务
*
* @param manualTaskDto 参数信息
* @param supplier 执行的具体业务逻辑
* @return
*/
default Boolean execute(ManualTaskDto manualTaskDto, Supplier<Boolean> supplier) {
return execute(manualTaskDto, null, supplier);
}
/**
* 开始执行定时任务
*
* @param manualTaskDto 参数信息
* @param methodName 方法名称 (一个类中只有一个@Schedlue注解可以为空,否则需要根据方法名称确定调度任务)
* @param supplier 执行的具体业务逻辑
* @return
*/
default Boolean execute(ManualTaskDto manualTaskDto, String methodName, Supplier<Boolean> supplier) {
Long beginTime = System.currentTimeMillis();
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("开始任务 name:{}", this.getClass().getSimpleName());
}
if (!before(manualTaskDto, methodName)) {
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("任务不执行");
}
return true;
}
if (!manualTaskDto.getPublishExecute()) {
if (CollectionUtils.isEmpty(manualTaskDto.getExecuteIpList())) {
return false;
}
//如果不是直接执行,需要发出通知
manualTaskDto.getSpringScheduleHelper().send(manualTaskDto.getBeanName(), manualTaskDto.getExecuteIpList(), manualTaskDto.getJobParameters());
return true;
}
if (!manualTaskDto.getExecuteIpList().contains(NetworkUtil.IP)) {
//不是执行任务的IP,直接返回
return true;
}
return manualTaskDto.getSpringScheduleHelper().executeServer(manualTaskDto, supplier, beginTime);
}
/**
* 获取能执行任务的IP信息
*
* @param manualTaskDto
* @param methodName
* @return
*/
default boolean before(ManualTaskDto manualTaskDto, String methodName) {
String className = this.getClass().getName();
if (StringUtils.isNotEmpty(methodName)) {
manualTaskDto.taskKey(className + ":" + methodName);
} else {
manualTaskDto.taskKey(className);
}
if (StringUtils.isEmpty(manualTaskDto.getBeanName())) {
int lastIndex = className.lastIndexOf(".");
//首字母小写
manualTaskDto.beanName(className.substring(lastIndex + 1, lastIndex + 2).toLowerCase() + className.substring(lastIndex + 2));
}
if (!manualTaskDto.getTestExecute() && manualTaskDto.getSpringScheduleHelper().testEnv()) {
//如果用户未设置需要执行测试环境,代码测试,并且服务器确实是测试黄精时,无需执行,测试环境需要调试,直接设置状态为true,测试环境也会执行
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("测试环境任务无需执行");
}
//如果测试环境无需执行
return false;
}
if (manualTaskDto.getPublishExecute()) {
//如果是直接执行
manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
return true;
}
if (!manualTaskDto.getSpringScheduleHelper().distributeTaskServer(manualTaskDto.getTaskKey())) {
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().debug("如果不是分发任务服务器,直接返回");
}
//如果测试环境无需执行
return false;
}
List<String> executorIpList = manualTaskDto.getSpringScheduleHelper().getActiveServer(manualTaskDto);
if (CollectionUtils.isEmpty(executorIpList)) {
if (manualTaskDto.getHighAvailability()) {
//没有有效的IP,直接使用自己,保证高可用
manualTaskDto.publishExecute(true);
//如果是直接执行
manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
return true;
}
if (manualTaskDto.getLogger() != null) {
manualTaskDto.getLogger().info("不存在有效的服务器资源信息 groupId:{} taskKey:{}", manualTaskDto.getSpringScheduleHelper().getGroupId(), manualTaskDto.getTaskKey());
return false;
}
}
/**
* 如果只有一个活跃的IP
* 1、活跃IP=当前服务器的IP,可以直接执行
* 2、随机的IP或者指定的IP活跃,但并非服务器IP,需要通过redis消息推送执行
*/
if (executorIpList.size() == 1 && NetworkUtil.IP.equals(executorIpList.get(0))) {
//只有一台服务器活动,且IP相同,直接调用
manualTaskDto.publishExecute(true);
//如果是直接执行
manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
return true;
}
manualTaskDto.executeIpList(executorIpList);
return true;
}
}
}
5.5、手动调用其他调度任务
package com.suyun.vehicle.task;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import com.suyun.vehicle.service.ManualTaskService;
import com.suyun.vehicle.task.service.retry.FailedMessageHandleTimerTaskService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Description:
* <p>
* 拉取失败任务,进行重试,多次重试失败,发往重试失败落盘topic
* 使用广播模式
* </p>
*
* @Author: leo.xiong
* @CreateDate: 2023/7/6 14:20
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Component
public class FailedMessageHandleTimerTask extends JavaProcessor implements ManualTaskService.SubManualTaskService {
private static final Logger LOGGER = LoggerFactory.getLogger(FailedMessageHandleTimerTask.class);
@Autowired
private FailedMessageHandleTimerTaskService failedMessageHandleTimerTaskService;
@Autowired
private SpringScheduleHelper springScheduleHelper;
@Override
public ProcessResult process(JobContext context) {
return processing(StringUtils.defaultIfEmpty(context.getInstanceParameters(), context.getJobParameters()));
}
@Override
@Scheduled(cron = "0 */2 * * * ?")
public void process() {
processing();
}
@Override
public ProcessResult processing(String jobParameters) {
ManualTaskDto manualTaskDto = ManualTaskDto.builder(springScheduleHelper).logger(LOGGER).jobParameters(jobParameters);
return new ProcessResult(execute(manualTaskDto, () -> publishExecute(jobParameters)));
}
@Override
public Boolean publishExecute(String jobParameters) {
return failedMessageHandleTimerTaskService.process(jobParameters);
}
}
6、Spring定时器实现原理
7、扩展实现
所有的定时任务都使用一秒钟调用一次,但需要另外配置一个真正的执行时间,分发机把每次最后一次执行时间存入Redis,是否需要执行,不执行,立即完成任务,通过最后一次执行时间跟当前系统时间做判断,这样在分发机宕机的失效时间之内就可以正常执行任务,而且执行的任务时间也是定时任务真正的执行时间,但它会导致线程池执行任务很频繁。
更多推荐
所有评论(0)