1、当前现状

  1. 所有的任务已经迁移到阿里云Schedulerx;
  2. 阿里云Schedulerx是按照调用次数收费,有些任务每秒调用一次,费用太高;

2、明确需求

  1. 需要把执行非常频繁的定时任务从阿里云迁移(阿里云收费根据调用次数,且可以针对单个任务设置是否启动定时调度);
  2. 服务是集群部署,存在广播模式和单机模式,大部分是单机模式;
  3. 有时任务失败,需要人工手动触发;
  4. 对新增的定时任务,不想在阿里云控制台过多配置,但又需要手动触发;

3、实现思路

  1. 保留原有阿里云定时任务,禁用执行非常频繁的手动任务,禁用后不会自动执行,但可以手动触发;

  2. 禁用的任务使用Spring的定时调度工具,(引入开源的调度工具比较麻烦);

  3. 需要考虑在集群环境下,广播和单机模式的实现,引入Redis锁机制;

  4. 可以定义一个任务,用来调起真正执行的认为,也使用手动触发模式;

4、注意事项

集群环境,定义了一个任务执行周期 3分钟一次,怎么样保证都是在3分钟执行一次,机器之间并不知道别的机器是否已经执行过了任务

每个任务,用一台机器作为任务分发机器,除非宕机,重新选择;

  1. 添加一个调度器,30s执行一次,一直往redis添加IP信息,设置3分钟过期,用于判断服务是否可用,超过3分钟没有更新数据,判断当前IP服务已宕机;
  2. 分发任务IP:在执行任务之前,setIf(当前任务,IP),如果能设置成功,设置失效时间大于当前调度下次执行时间30s后,如果不能设置,查回IP,如果IP相同,更新任务IP;
  3. 如果不是分发任务IP,直接完成,否则,查询活跃IP,之后根据广播通知方式,指定IP等策略获取有效的执行任务IP;
  4. 如果当前有效IP只有一个,并且等于分发IP,直接执行;
  5. IP多个或不同,直接发出redis通知,执行完成;
  6. Redis消息订阅,直接通过beanName获取对象,调用单独执行逻辑方法;
package com.suyun.vehicle.conf;

import com.google.common.collect.Maps;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.Task;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.util.CollectionUtils;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Description:
 * <p>
 *
 * </p>
 *
 * @Author: leo.xiong
 * @CreateDate: 2023/7/4 14:06
 * @Email: leo.xiong@suyun360.com
 * @Since:
 */
@EnableScheduling
@org.springframework.context.annotation.Configuration("springScheduleConfig")
public class SpringScheduleConfig implements SchedulingConfigurer, ApplicationListener, ApplicationContextAware {

    @Value("${scheduler.task.size:8}")
    private int taskSize;

    private static ScheduledTaskRegistrar TASK_REGISTRAR;

    private static ApplicationContext applicationContext;

    private static final Map<String, Map<String, CronTask>> CLASS_NAME_METHOD_NAME_CRON_TASK_MAP = Maps.newHashMap();

    /**
     * <p>注入线程池</p>
     * <p>{@link ScheduledAnnotationBeanPostProcessor}实现Spring定时器</p>
     * <p>ScheduledAnnotationBeanPostProcessor->MergedBeanDefinitionPostProcessor->BeanPostProcessor扫描注解@Schedule,生成任务</p>
     * <p>ScheduledAnnotationBeanPostProcessor->SmartInitializingSingleton和ApplicationListener接口,Spring Bean注入,还未发出通知之前afterSingletonsInstantiated执行一次,Spring 通知执行一次</p>
     * <p>1、默认使用一个线程执行,如果多个任务都是每秒执行一次获取有任务执行时间过长,会造成任务延迟,严重直接阻塞任务;</p>
     * <p>2、重写configureTasks方法,注入线程池</p>
     * <p>3、异步执行需要开启@EnableAsync异步注解,方法上添加@Async注解</p>
     * <p>4、ScheduledThreadPoolExecutor.ScheduledFutureTask方法,首选判断任务是否执行,不管手动还是定时,如果是,跳过这次执行,2判断是否非周期调用,直接执行,如果周期调用,执行,如果call()抛出异常,不在重新设置下次任务执行时间,定时任务不在执行</p>
     *
     * @param taskRegistrar
     */
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskScheduler());
        addFirstCron(taskRegistrar);
        TASK_REGISTRAR = taskRegistrar;
    }

    /**
     * <p>{@link ScheduledThreadPoolExecutor}和{@link ThreadPoolTaskScheduler}只能使用一个,IOC容器互斥</p>
     * <p>ScheduledThreadPoolExecutor只能使用设置固定核心线程长度的线程池,灵活性不足</p>
     * <p>ThreadPoolTaskScheduler实例化Bean时,实现了InitializingBean接口,实现了异步调度接口</p>
     * <p>最终执行任务的线程池都为{@link java.util.concurrent.ScheduledExecutorService}实现的{@link ScheduledThreadPoolExecutor}实例</p>
     *
     * @return
     */
    @Bean
    @DependsOn(value = {"springScheduleHelper"})
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 配置线程池大小,根据任务数量定制
        taskScheduler.setPoolSize(taskSize);
        // 线程名称前缀
        taskScheduler.setThreadNamePrefix("suyun-ThreadPoolTaskScheduler-thread-");
        // 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
        // 线程池关闭前最大等待时间,确保最后一定关闭,线程任务最大执行时间30分钟
        taskScheduler.setAwaitTerminationSeconds(1800);
        // 线程池关闭时等待所有任务完成
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        // 任务拒绝策略 任务->判断核心线程是否存在空闲(直接执行)->判断队列是否已满(已满拒绝,否则放入队列)->判断是否达到最大线程(没有,创建执行,否则返回,等待可用线程)
        taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        return taskScheduler;
    }

    /**
     * 预计下次执行时间
     *
     * @param className
     */
    public static Long nextTime(String className) {
        if (!className.contains(":")) {
            return nextTime(className, null);
        }
        String[] classNameMethodNames = className.split(":");
        return nextTime(classNameMethodNames[0], classNameMethodNames[1]);
    }

    /**
     * 预计下次执行时间
     *
     * @param className  对象名称类名
     * @param methodName 执行的方法名,如果类中只有一个@Scheduler,则方法名可以为空
     */
    public static Long nextTime(String className, String methodName) {
        CronTask cronTask = getCronTask(className, methodName);
        if (cronTask == null) {
            return null;
        }
        return cronTask.getTrigger().nextExecutionTime(new SimpleTriggerContext()).getTime();
    }

    /**
     * 获取任务队列
     *
     * @param className
     * @param methodName
     * @return
     */
    private static CronTask getCronTask(String className, String methodName) {
        LinkedHashMap<String, CronTask> methodNameCronTaskMap = (LinkedHashMap) CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(className);
        if (CollectionUtils.isEmpty(methodNameCronTaskMap)) {
            return null;
        }
        if (methodNameCronTaskMap.size() == 1) {
            for (Map.Entry<String, CronTask> methodNameCronTaskEntry : methodNameCronTaskMap.entrySet()) {
                return methodNameCronTaskEntry.getValue();
            }
        }
        if (StringUtils.isEmpty(methodName)) {
            return null;
        }
        return methodNameCronTaskMap.get(methodName);
    }

    /**
     * 根据名称获取容器对象信息
     *
     * @param name
     * @param <T>
     * @return
     */
    public static <T> T getBean(String name) {
        return (T) applicationContext.getBean(name);
    }

    /**
     * 根据类型获取容器对象信息
     *
     * @param requiredType
     * @param <T>
     * @return
     */
    public static <T> T getBean(Class<T> requiredType) {
        return applicationContext.getBean(requiredType);
    }

    /**
     * 根据名称类型获取容器对象信息
     *
     * @param requiredType
     * @param <T>
     * @return
     */
    public static <T> T getBean(String name, Class<T> requiredType) {
        return applicationContext.getBean(name, requiredType);
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (TASK_REGISTRAR == null || CollectionUtils.isEmpty(TASK_REGISTRAR.getCronTaskList())) {
            return;
        }
        ScheduledMethodRunnable scheduledMethodRunnable = null;
        Map<String, CronTask> methodNameCronTaskMap = null;
        String className = null, methodName = null;
        for (CronTask cronTask : TASK_REGISTRAR.getCronTaskList()) {
            scheduledMethodRunnable = (ScheduledMethodRunnable) cronTask.getRunnable();
            className = scheduledMethodRunnable.getTarget().getClass().getName();
            methodNameCronTaskMap = CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(className);
            if (methodNameCronTaskMap == null) {
                methodNameCronTaskMap = Maps.newLinkedHashMap();
                CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.put(className, methodNameCronTaskMap);
            }
            methodName = scheduledMethodRunnable.getMethod().getName();
            methodNameCronTaskMap.put(methodName, cronTask);
        }
    }

    /**
     * 手动添加{@link SpringScheduleHelper}任务进入定时器
     * SpringScheduleHelper bean注入在configureTasks方法之后,需要手动注入对象
     * @param taskRegistrar
     */
    private void addFirstCron(ScheduledTaskRegistrar taskRegistrar) {
        Map<String, CronTask> methodNameCronTaskMap;
        //为初始scheduleReportServerStatusCronTask对象,手动添加到定时任务中
        ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor = applicationContext.getBean(ScheduledAnnotationBeanPostProcessor.class);
        SpringScheduleHelper springScheduleHelper = applicationContext.getBean(SpringScheduleHelper.class);
        if (scheduledAnnotationBeanPostProcessor == null || springScheduleHelper == null) {
            return;
        }
        scheduledAnnotationBeanPostProcessor.postProcessAfterInitialization(springScheduleHelper, "springScheduleHelper");
        Set<ScheduledTask> scheduledTaskSet = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
        Task task = null;
        ScheduledMethodRunnable scheduledMethodRunnableValue = null;
        for (ScheduledTask scheduledTask : scheduledTaskSet) {
            task = scheduledTask.getTask();
            if (!(task instanceof CronTask)) {
                continue;
            }
            scheduledMethodRunnableValue = (ScheduledMethodRunnable) task.getRunnable();
            if (!scheduledMethodRunnableValue.getTarget().getClass().getName().equals(SpringScheduleHelper.class.getName())) {
                continue;
            }
            CronTask cronTask = (CronTask) task;
            //手动注入注册服务定时任务
            taskRegistrar.addCronTask(cronTask);
            //手动触发一次
            taskRegistrar.scheduleCronTask(cronTask);
            methodNameCronTaskMap = CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.get(SpringScheduleHelper.class.getName());
            if (methodNameCronTaskMap == null) {
                methodNameCronTaskMap = Maps.newLinkedHashMap();
                CLASS_NAME_METHOD_NAME_CRON_TASK_MAP.put(SpringScheduleHelper.class.getName(), methodNameCronTaskMap);
            }
            methodNameCronTaskMap.put(scheduledMethodRunnableValue.getMethod().getName(), cronTask);
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringScheduleConfig.applicationContext = applicationContext;
    }
}

5.2、定时任务帮助类

package com.suyun.vehicle.helper;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.suyun.vehicle.conf.SpringScheduleConfig;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.service.ManualTaskService;
import com.suyun.vehicle.utils.NetworkUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * Description:
 * <p>
 *
 * </p>
 *
 * @Author: leo.xiong
 * @CreateDate: 2023/7/6 19:41
 * @Email: leo.xiong@suyun360.com
 * @Since:
 */
@Component
public class SpringScheduleHelper implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringScheduleHelper.class);

    private static final Random RANDOM = new Random();
    /**
     * 注册服务前缀RedisKey
     */
    private static final String SERVER_REGISTER_PREFIX = "server:register:";
    /**
     * 分发机广播通知通道
     */
    private static final String SCHEDULE_CHANNEL_NAME = "schedule:channel:name:";
    /**
     * 任务分发服务器,一个任务尽可能用一台服务器分发任务
     */
    private static final String SCHEDULE_DISTRIBUTE_TASK_SERVER = "schedule:distribute:task:server";
    /**
     * 当前正在执行任务的服务器
     */
    private static final String SCHEDULE_EXECUTE_SERVER = "schedule:execute:task:server";

    @Resource(name = "redisTemplate")
    private RedisTemplate<String, String> redisTemplate;

    @Value("${schedulerx.is.test.model:true}")
    private Boolean testModel;

    @Value("${schedulerx.test.endpoint:}")
    private String testEndpoint;

    @Value("${schedulerx.group.id:}")
    private String groupId;

    /**
     * <p>定时器实时上报服务器状态信息,每30S上报一次,在 30 * 6 = 180=3分钟,如果3分钟一直没有成功,表示该服务器不可用,剔除执行列表</p>
     * <p>初始化手动触发一次,不直接触发,那么定时任务时间执行周期需要大于30s</p>
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void scheduleRegisterServerStatus() {
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("开始注册机器 groupId:{} IP:{}", groupId, NetworkUtil.IP);
            }
            redisTemplate.opsForValue().set(registerIpKey(NetworkUtil.IP), "更新时间:" + System.currentTimeMillis(), 3, TimeUnit.MINUTES);
        } catch (Exception e) {

        }
    }

    /**
     * 注册机Key
     *
     * @param IP
     * @return
     */
    private String registerIpKey(String IP) {
        return createRedisKey(SERVER_REGISTER_PREFIX, IP);
    }

    /**
     * 根据任务名称和方法获取分发机器IP
     *
     * @param taskKey
     * @return
     */
    private String distributeIpKey(String taskKey) {
        return createRedisKey(SCHEDULE_DISTRIBUTE_TASK_SERVER, taskKey);
    }

    /**
     * 执行任务IP
     *
     * @param IP
     * @return
     */
    private String executeIpKey(String IP) {
        return createRedisKey(SCHEDULE_EXECUTE_SERVER, IP);
    }

    private String createRedisKey(String prefix, String IP) {
        return prefix + groupId + ":" + Optional.ofNullable(IP).orElse("");
    }

    /**
     * 根据GroupId获取有效的注册了的服务器信息(可以执行任务)
     * 1、groupId为空||没有注册的机器,表示任务不能执行
     * 2、如果只有一台机器活跃,直接选择当前机器(尽可能保证高可用)
     * 3、如果是广播
     *
     * @param manualTaskDto - broadcast 是否广播模式 广播模式,所有的机器都执行/单机模式,随机选择一台机器执行
     *                      - fixedMachine 固定机器执行
     * @return
     */
    public List<String> getActiveServer(ManualTaskDto manualTaskDto) {
        if (StringUtils.isEmpty(groupId)) {
            return Collections.EMPTY_LIST;
        }
        Set<String> keys = redisTemplate.keys(registerIpKey("*"));
        if (CollectionUtils.isEmpty(keys)) {
            return Collections.EMPTY_LIST;
        }
        List<String> registerIpList = keys.parallelStream().map(key -> {
            return key.replace(registerIpKey(""), "");
        }).collect(Collectors.toList());
        if (registerIpList.size() == 1 || manualTaskDto.getBroadcast()) {
            //如果只有一台注册成功的机器,或者是广播模式,直接选择当前机器
            return registerIpList;
        }
        if (StringUtils.isNotEmpty(manualTaskDto.getIp())) {
            //指定IP模式,如果IP注册机存在直接返回
            if (registerIpList.contains(manualTaskDto.getIp())) {
                return Lists.newArrayList(manualTaskDto.getIp());
            }
            if (!manualTaskDto.getHighAvailability()) {
                throw new RuntimeException("非高可用模式,指定了IP {},但IP并未注册" + manualTaskDto.getIp());
            }
        }
        int index = 0;
        if (manualTaskDto.getFixedMachine()) {
            //如果固定IP模式,则直接根据任务Hash获取具体执行的IP
            index = Hashing.consistentHash(manualTaskDto.getTaskKey().hashCode(), registerIpList.size());
            return Lists.newArrayList(registerIpList.get(index));
        }
        //如果是随机模式,获取当前正在执行任务的的服务器IP,尽可能不使用当前机器,尽可能防止同一台机器同时执行多个定时任务,造成单机压力过大
        Set<String> executeServerKey = redisTemplate.keys(executeIpKey("*"));
        if (!CollectionUtils.isEmpty(executeServerKey)) {
            //如果存在当前正在执行的服务器,尽可能排除它,防止同一台机器同时执行多个任务
            Set<String> executeIpSet = executeServerKey.parallelStream().map(executeIp -> executeIp.replace(executeIpKey(""), "")).collect(Collectors.toSet());
            List<String> noExecuteIpList = Lists.newArrayList();
            for (String registerIp : registerIpList) {
                if (executeIpSet.contains(registerIp)) {
                    continue;
                }
                noExecuteIpList.add(registerIp);
            }
            if (!CollectionUtils.isEmpty(noExecuteIpList)) {
                index = RANDOM.nextInt(noExecuteIpList.size());
                return Lists.newArrayList(noExecuteIpList.get(index));
            }
        }
        //随机一个大于等于[0,keys.size())的int,不包括后面的数量
        index = RANDOM.nextInt(keys.size());
        return Lists.newArrayList(registerIpList.get(index));
    }

    public String sendKey() {
        return SCHEDULE_CHANNEL_NAME + ":" + groupId;
    }

    /**
     * <p>发出通知,对多台机器执行任务</p>
     * <p>如果服务已使用redis消息订阅</p>
     *
     * @param beanName
     * @param executeIpList
     * @param jobParameters
     */
    public void send(String beanName, List<String> executeIpList, String jobParameters) {
        //消息订阅通知模式
        try {
            ManualTaskDto manualTaskDto = ManualTaskDto.builder().beanName(beanName).jobParameters(jobParameters).executeIpList(executeIpList).groupId(groupId);
            redisTemplate.convertAndSend(sendKey(), JSON.toJSONString(manualTaskDto));
        } catch (Exception e) {
            LOGGER.warn("发送通知失败", e);
        }
    }

    /**
     * 是否测试环境,生产环境阿里无需配置endpoint,测试环境需要配置
     *
     * @return
     */
    public boolean testEnv() {
        if (!testModel) {
            //如果设置生产环境,直接返回false,不是测试环境,否则,判断是否设置域名信息
            return false;
        }
        return StringUtils.isNotEmpty(testEndpoint);
    }

    public String getGroupId() {
        return groupId;
    }

    /**
     * 是否分发任务IP,如果不是分发任务IP,无需执行任务
     *
     * @return
     */
    public boolean distributeTaskServer(String taskKey) {
        //如果下次任务执行时间为空,则表示任务没有注册
        Long nextTime = SpringScheduleConfig.nextTime(taskKey);
        if (nextTime == null) {
            return false;
        }
        String key = distributeIpKey(taskKey);
        Boolean trySetFlag = redisTemplate.opsForValue().setIfAbsent(key, NetworkUtil.IP);
        if (trySetFlag) {
            //如果尝试把自己设置为分发服务成功,则该机器分发当前任务,做定时处理,并对注册机设置失效时间,防止机器宕机,别的服务器不能使用
            redisTemplate.expire(key, 3, TimeUnit.MINUTES);
            return true;
        }
        //如果设置失败,表示已经有机器做了注册机
        String distribute = redisTemplate.opsForValue().get(key);
        if (NetworkUtil.IP.equals(distribute)) {
            //如果IP相同,表示第二次调用,直接更新失效时间
            redisTemplate.opsForValue().set(key, NetworkUtil.IP, 3, TimeUnit.MINUTES);
            return true;
        }
        return false;
    }

    /**
     * 消费消息
     *
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        if (message == null || message.getBody() == null || message.getBody().length <= 0) {
            return;
        }
        RedisSerializer<?> serializer = redisTemplate.getValueSerializer();
        ManualTaskDto manualTaskDto = null;
        try {
            manualTaskDto = JSON.parseObject((String) serializer.deserialize(message.getBody()), ManualTaskDto.class);
        } catch (SerializationException e) {
            LOGGER.info("序列化失败 message: {}", message, e);
            return;
        }
        if (CollectionUtils.isEmpty(manualTaskDto.getExecuteIpList()) || !manualTaskDto.getExecuteIpList().contains(NetworkUtil.IP)) {
            return;
        }
        ManualTaskService.SubManualTaskService subManualTaskService = SpringScheduleConfig.getBean(manualTaskDto.getBeanName(), ManualTaskService.SubManualTaskService.class);
        if (subManualTaskService == null) {
            return;
        }
        //如果当前服务器IP是需要执行任务的IP,那么直接执行
        final String jobParameters = manualTaskDto.getJobParameters();
        executeServer(manualTaskDto, () -> {
            return subManualTaskService.publishExecute(jobParameters);
        }, System.currentTimeMillis());
    }

    /**
     * 执行任务,并且保存正在执行的机器IP,下次选择是尽可能不选择正在执行任务的IP,防止时间过长
     *
     * @param manualTaskDto
     * @param supplier
     * @param beginTime
     * @return
     */
    public Boolean executeServer(ManualTaskDto manualTaskDto, Supplier<Boolean> supplier, long beginTime) {
        Boolean r = null;
        String executeServerKey = executeIpKey(NetworkUtil.IP);
        try {

            /**@Scheduled 任务必须抓取异常,否则阻塞任务队列,任务不在执行{@link com.suyun.vehicle.conf.SpringScheduleConfig}**/
            boolean executeFlag = redisTemplate.opsForValue().setIfAbsent(executeServerKey, manualTaskDto.getTaskKey());
            if (executeFlag) {
                //执行任务机器最多90s不能通过随机直接选择为执行及,除非所有的机器都是正在执行任务
                redisTemplate.expire(executeServerKey, 90, TimeUnit.SECONDS);
            }
            Long bussinessTime = System.currentTimeMillis();
            if (manualTaskDto.getLogger() != null) {
                manualTaskDto.getLogger().debug("开始执行用户定义的业务逻辑 groupId: {}", groupId);
            }
            r = supplier.get();
            if (r == null) {
                return true;
            }
            if (manualTaskDto.getLogger() != null) {
                manualTaskDto.getLogger().debug("结束执行用户定义的业务逻辑 groupId: {} 任务执行时间:{} ms", groupId, System.currentTimeMillis() - bussinessTime);
            }
        } catch (Throwable e) {
            //Throwable异常大于Exception,是Exception的父级,包括了Error等,如调用飞书接口,发送通知失败,会抛出Error,使用Exception就会抓取失败,防止有些异常抓取失败,导致线程阻塞
            if (manualTaskDto.getLogger() != null) {
                manualTaskDto.getLogger().warn("任务执行失败:manualTaskDto: {}", manualTaskDto, e);
            }
            return false;
        } finally {
            redisTemplate.delete(executeServerKey);
        }
        if (manualTaskDto.getLogger() != null) {
            manualTaskDto.getLogger().info("结束任务 groupId: {} 任务执行时间:{} ms", groupId, System.currentTimeMillis() - beginTime);
        }
        return Optional.ofNullable(r).orElse(Boolean.FALSE);
    }
}

5.3、参数类信息

package com.suyun.vehicle.dao;

import com.suyun.vehicle.helper.SpringScheduleHelper;
import org.slf4j.Logger;

import java.io.Serializable;
import java.util.List;

/**
 * Description:
 * <p>
 *
 * </p>
 *
 * @Author: leo.xiong
 * @CreateDate: 2023/7/4 16:58
 * @Email: leo.xiong@suyun360.com
 * @Since:
 */
public class ManualTaskDto implements Serializable {
    private static final long serialVersionUID = -2638699521129899896L;
    /**
     * 通知触发执行,否则不执行具体逻辑,默认false,如果设置为true,表示直接执行
     */
    private Boolean publishExecute = false;

    private SpringScheduleHelper springScheduleHelper;
    /**
     * 信号过滤
     */
    private String groupId;
    /**
     * 是否高可用,当机器信息不存在是,直接使用分发者作为机器
     */
    private Boolean highAvailability = true;
    /**
     * 定时器对象logger
     */
    private Logger logger;
    /**
     * 对象名称
     */
    private String beanName;
    /**
     * 获取任务的key {@link com.suyun.vehicle.conf.SpringScheduleConfig}
     */
    private String taskKey;
    /**
     * <p>是否广播模式(集群时每台服务器都执行)/单机模式</p>
     * <p>默认单机模式</p>
     * <p>如果valueOperations不为空,则默认使用单机模式,可以通过后置broadcast方法修改</p>
     */
    private Boolean broadcast = false;
    /**
     * <p>是否某个任务在注册机器未增减的时候使用固定的机器执行,默认不是,使用随机模式,可以自定义实现轮训模式,记录上一次执行的IP {@link com.suyun.vehicle.conf.SpringScheduleConfig}</p>
     * <p>使用广播模式,指定机器属性无效</p>
     */
    private Boolean fixedMachine = false;
    /**
     * 直接指定IP模式
     */
    private String ip;
    /**
     * 添加需要执行任务的IP集合
     */
    private List<String> executeIpList;

    private String jobParameters;
    /**
     * 是否执行测试环境,默认不执行
     */
    private Boolean testExecute = false;

    public ManualTaskDto() {
    }

    /**
     * @param springScheduleHelper 执行任务帮助类
     */
    private ManualTaskDto(String beanName, SpringScheduleHelper springScheduleHelper) {
        this.springScheduleHelper = springScheduleHelper;
        this.beanName = beanName;
    }

    public ManualTaskDto(String beanName, List<String> executeIpList, String jobParameters) {
        this.beanName = beanName;
        this.executeIpList = executeIpList;
        this.jobParameters = jobParameters;
    }

    public Logger getLogger() {
        return logger;
    }

    public ManualTaskDto logger(Logger logger) {
        this.logger = logger;
        return this;
    }

    public ManualTaskDto taskKey(String taskKey) {
        this.taskKey = taskKey;
        return this;
    }

    public String getTaskKey() {
        return taskKey;
    }

    public SpringScheduleHelper getSpringScheduleHelper() {
        return springScheduleHelper;
    }

    public static ManualTaskDto builder() {
        return new ManualTaskDto(null, null);
    }

    public static ManualTaskDto builder(SpringScheduleHelper springScheduleHelper) {
        return new ManualTaskDto(null, springScheduleHelper);
    }

    public static ManualTaskDto builder(String beanName, SpringScheduleHelper springScheduleHelper) {
        return new ManualTaskDto(beanName, springScheduleHelper);
    }

    public Boolean getBroadcast() {
        return broadcast;
    }

    public ManualTaskDto broadcast(Boolean broadcast) {
        this.broadcast = broadcast;
        return this;
    }

    public Boolean getFixedMachine() {
        return fixedMachine;
    }

    public ManualTaskDto fixedMachine(Boolean fixedMachine) {
        this.fixedMachine = fixedMachine;
        return this;
    }

    public String getIp() {
        return ip;
    }

    public ManualTaskDto ip(String ip) {
        this.ip = ip;
        return this;
    }

    public Boolean getPublishExecute() {
        return publishExecute;
    }

    public ManualTaskDto publishExecute(Boolean publishExecute) {
        this.publishExecute = publishExecute;
        return this;
    }

    public List<String> getExecuteIpList() {
        return executeIpList;
    }

    public ManualTaskDto executeIpList(List<String> executeIpList) {
        this.executeIpList = executeIpList;
        return this;
    }

    public String getBeanName() {
        return beanName;
    }

    public ManualTaskDto beanName(String beanName) {
        this.beanName = beanName;
        return this;
    }

    public String getJobParameters() {
        return jobParameters;
    }

    public ManualTaskDto jobParameters(String jobParameters) {
        this.jobParameters = jobParameters;
        return this;
    }

    public Boolean getTestExecute() {
        return testExecute;
    }

    public ManualTaskDto testExecute(Boolean testExecute) {
        this.testExecute = testExecute;
        return this;
    }

    public String getGroupId() {
        return groupId;
    }

    public ManualTaskDto groupId(String groupId) {
        this.groupId = groupId;
        return this;
    }

    public Boolean getHighAvailability() {
        return highAvailability;
    }

    public ManualTaskDto setHighAvailability(Boolean highAvailability) {
        this.highAvailability = highAvailability;
        return this;
    }

    @Override
    public String toString() {
        return "ManualTaskDto{" +
                "publishExecute=" + publishExecute +
                ", springScheduleHelper=" + springScheduleHelper +
                ", groupId='" + groupId + '\'' +
                ", beanName='" + beanName + '\'' +
                ", taskKey='" + taskKey + '\'' +
                ", broadcast=" + broadcast +
                ", fixedMachine=" + fixedMachine +
                ", executeIpList=" + executeIpList +
                ", jobParameters='" + jobParameters + '\'' +
                ", testExecute=" + testExecute +
                '}';
    }
}

5.4、定义调度实现接口

package com.suyun.vehicle.service;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.suyun.vehicle.conf.SpringScheduleConfig;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import com.suyun.vehicle.utils.NetworkUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * Description:
 * <p>
 * 手动定时任务处理调用阿里云定时器 {@link com.suyun.vehicle.conf.SpringScheduleConfig}
 * </p>
 *
 * @Author: leo.xiong
 * @CreateDate: 2023/7/4 9:56
 * @Email: leo.xiong@suyun360.com
 * @Since:
 */
@FunctionalInterface
public interface ManualTaskService {

    String JOB_NAME = "jobInstance";

    String PARAM = "param";

    /**
     * 手动任务执行前做一定的校验->获取对应执行任务的对象,手动执行一次
     *
     * @param instanceParameters
     * @return
     * @throws Exception
     */
    Object manualProcess(String instanceParameters) throws Exception;

    /**
     * 校验参数
     *
     * @param instanceParameters
     * @return
     */
    default Object validParam(String instanceParameters) {
        if (StringUtils.isEmpty(instanceParameters)) {
            return "执行任务参数不能为空";
        }
        if (!instanceParameters.contains("{") || !instanceParameters.contains("}")) {
            return "执行任务参数不是JSON字符串";
        }
        Map<String, String> keyValue = null;
        try {
            keyValue = JSON.parseObject(instanceParameters, Map.class);
        } catch (Exception e) {
            return "执行任务参数不是JSON字符串 instanceParameters:" + instanceParameters;
        }
        if (!keyValue.containsKey(JOB_NAME)) {
            return "任务实例不能为空 instanceParameters:" + instanceParameters;
        }
        return keyValue;
    }

    /**
     * 子任务执行
     */
    interface SubManualTaskService {
        /**
         * 定时执行 @Scheduled
         *
         * @return
         */
        void process();

        /**
         * 无参执行
         *
         * @return
         */
        default Object processing() {
            return processing(null);
        }

        /**
         * 根据参数执行任务
         *
         * @param jobParameters
         * @return
         */
        Object processing(String jobParameters);

        /**
         * Redis消息订阅执行方法
         *
         * @param jobParameters
         * @return
         */
        Boolean publishExecute(String jobParameters);

        /**
         * 开始执行定时任务
         *
         * @param manualTaskDto 参数信息
         * @param supplier      执行的具体业务逻辑
         * @return
         */
        default Boolean execute(ManualTaskDto manualTaskDto, Supplier<Boolean> supplier) {
            return execute(manualTaskDto, null, supplier);
        }

        /**
         * 开始执行定时任务
         *
         * @param manualTaskDto 参数信息
         * @param methodName    方法名称 (一个类中只有一个@Schedlue注解可以为空,否则需要根据方法名称确定调度任务)
         * @param supplier      执行的具体业务逻辑
         * @return
         */
        default Boolean execute(ManualTaskDto manualTaskDto, String methodName, Supplier<Boolean> supplier) {
            Long beginTime = System.currentTimeMillis();
            if (manualTaskDto.getLogger() != null) {
                manualTaskDto.getLogger().debug("开始任务 name:{}", this.getClass().getSimpleName());
            }
            if (!before(manualTaskDto, methodName)) {
                if (manualTaskDto.getLogger() != null) {
                    manualTaskDto.getLogger().debug("任务不执行");
                }
                return true;
            }
            if (!manualTaskDto.getPublishExecute()) {
                if (CollectionUtils.isEmpty(manualTaskDto.getExecuteIpList())) {
                    return false;
                }
                //如果不是直接执行,需要发出通知
                manualTaskDto.getSpringScheduleHelper().send(manualTaskDto.getBeanName(), manualTaskDto.getExecuteIpList(), manualTaskDto.getJobParameters());
                return true;
            }
            if (!manualTaskDto.getExecuteIpList().contains(NetworkUtil.IP)) {
                //不是执行任务的IP,直接返回
                return true;
            }
            return manualTaskDto.getSpringScheduleHelper().executeServer(manualTaskDto, supplier, beginTime);
        }

        /**
         * 获取能执行任务的IP信息
         *
         * @param manualTaskDto
         * @param methodName
         * @return
         */
        default boolean before(ManualTaskDto manualTaskDto, String methodName) {
            String className = this.getClass().getName();
            if (StringUtils.isNotEmpty(methodName)) {
                manualTaskDto.taskKey(className + ":" + methodName);
            } else {
                manualTaskDto.taskKey(className);
            }
            if (StringUtils.isEmpty(manualTaskDto.getBeanName())) {
                int lastIndex = className.lastIndexOf(".");
                //首字母小写
                manualTaskDto.beanName(className.substring(lastIndex + 1, lastIndex + 2).toLowerCase() + className.substring(lastIndex + 2));
            }
            if (!manualTaskDto.getTestExecute() && manualTaskDto.getSpringScheduleHelper().testEnv()) {
                //如果用户未设置需要执行测试环境,代码测试,并且服务器确实是测试黄精时,无需执行,测试环境需要调试,直接设置状态为true,测试环境也会执行
                if (manualTaskDto.getLogger() != null) {
                    manualTaskDto.getLogger().debug("测试环境任务无需执行");
                }
                //如果测试环境无需执行
                return false;
            }
            if (manualTaskDto.getPublishExecute()) {
                //如果是直接执行
                manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
                return true;
            }
            if (!manualTaskDto.getSpringScheduleHelper().distributeTaskServer(manualTaskDto.getTaskKey())) {
                if (manualTaskDto.getLogger() != null) {
                    manualTaskDto.getLogger().debug("如果不是分发任务服务器,直接返回");
                }
                //如果测试环境无需执行
                return false;
            }
            List<String> executorIpList = manualTaskDto.getSpringScheduleHelper().getActiveServer(manualTaskDto);
            if (CollectionUtils.isEmpty(executorIpList)) {
                if (manualTaskDto.getHighAvailability()) {
                    //没有有效的IP,直接使用自己,保证高可用
                    manualTaskDto.publishExecute(true);
                    //如果是直接执行
                    manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
                    return true;
                }
                if (manualTaskDto.getLogger() != null) {
                    manualTaskDto.getLogger().info("不存在有效的服务器资源信息 groupId:{} taskKey:{}", manualTaskDto.getSpringScheduleHelper().getGroupId(), manualTaskDto.getTaskKey());
                    return false;
                }
            }
            /**
             * 如果只有一个活跃的IP
             * 1、活跃IP=当前服务器的IP,可以直接执行
             * 2、随机的IP或者指定的IP活跃,但并非服务器IP,需要通过redis消息推送执行
             */
            if (executorIpList.size() == 1 && NetworkUtil.IP.equals(executorIpList.get(0))) {
                //只有一台服务器活动,且IP相同,直接调用
                manualTaskDto.publishExecute(true);
                //如果是直接执行
                manualTaskDto.executeIpList(Lists.newArrayList(NetworkUtil.IP));
                return true;
            }
            manualTaskDto.executeIpList(executorIpList);
            return true;
        }
    }
}

5.5、手动调用其他调度任务

package com.suyun.vehicle.task;

import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.suyun.vehicle.dao.ManualTaskDto;
import com.suyun.vehicle.helper.SpringScheduleHelper;
import com.suyun.vehicle.service.ManualTaskService;
import com.suyun.vehicle.task.service.retry.FailedMessageHandleTimerTaskService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * Description:
 * <p>
 * 拉取失败任务,进行重试,多次重试失败,发往重试失败落盘topic
 * 使用广播模式
 * </p>
 *
 * @Author: leo.xiong
 * @CreateDate: 2023/7/6 14:20
 * @Email: leo.xiong@suyun360.com
 * @Since:
 */
@Component
public class FailedMessageHandleTimerTask extends JavaProcessor implements ManualTaskService.SubManualTaskService {
    private static final Logger LOGGER = LoggerFactory.getLogger(FailedMessageHandleTimerTask.class);

    @Autowired
    private FailedMessageHandleTimerTaskService failedMessageHandleTimerTaskService;

    @Autowired
    private SpringScheduleHelper springScheduleHelper;

    @Override
    public ProcessResult process(JobContext context) {
        return processing(StringUtils.defaultIfEmpty(context.getInstanceParameters(), context.getJobParameters()));
    }

    @Override
    @Scheduled(cron = "0 */2 * * * ?")
    public void process() {
        processing();
    }

    @Override
    public ProcessResult processing(String jobParameters) {
        ManualTaskDto manualTaskDto = ManualTaskDto.builder(springScheduleHelper).logger(LOGGER).jobParameters(jobParameters);
        return new ProcessResult(execute(manualTaskDto, () -> publishExecute(jobParameters)));
    }
    
    @Override
    public Boolean publishExecute(String jobParameters) {
        return failedMessageHandleTimerTaskService.process(jobParameters);
    }
}

6、Spring定时器实现原理

实现原理

7、扩展实现

所有的定时任务都使用一秒钟调用一次,但需要另外配置一个真正的执行时间,分发机把每次最后一次执行时间存入Redis,是否需要执行,不执行,立即完成任务,通过最后一次执行时间跟当前系统时间做判断,这样在分发机宕机的失效时间之内就可以正常执行任务,而且执行的任务时间也是定时任务真正的执行时间,但它会导致线程池执行任务很频繁。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐