一、背景

开发过程中或多或少会遇到某些场景,要求数据在规定的时间内如果没处理就要失效掉;

如:用户下单,订单在30分钟内没支付就要自动取消,防止长时间占用库存等;

面对这种情况我们来扒拉一下:

  1. 系统启一个定时任务,定时扫库,取出超过30分钟的数据,取消释放库存
  2. 用户下单后将订单放入MQ的延迟队列(比如RocketMQ的延迟消息),依赖于MQ的通知实时触发取消
  3. 将数据放入Redis,定时扫描Redis
  4. 将数据放入收尾相连的环形链路中,依次触发超时

二、实现思路

1、定时任务扫库

在分布式定时任务系统中启动一个定时任务,定时任务时间可根据实际业务定义(1分钟、5分钟)

存在问题:

  1. 如果是单库:通常数据量比较多,如果每秒扫描可能比较耗资源,往往是每分钟或、、、超时有误差
  2. 如果是分库分表:需要扫描全量库表,

好处:

  1. 实现相对简单,就是一次普通的查询

数据量少的情况下可以使用,注意查询条件要配以索引,否则会非常慢

2、依赖MQ的延迟队列实现

以RocketMQ为例:

当Producer端发送延迟消息时,Broker会创建一个SCHEDULE_TOPIC*开头的Topic,然后以该topic持久化

然后给每个topic启动定时任务扫描,将数据放入真正的Topic推送到Consumer端消费

存在问题:有些消息队列不支持延迟队列比如rabbit,需要依赖插件;rocketmq延迟时间只支持配置的枚举时间,有一定局限性

3、基于redis定时--全量扫描实现

这种实现跟第一种基于数据库基本类似,就是将数据库换成redis

存在问题:每次都是全量数据拉取,如果数据量过大会这种资源占用,再说redis单线程处理,要防止过大数据拉取

4、基于redis--将数据放入List数据结构

放数据的时候使用rightPush函数从右测逐个放入

启动定时任务(每秒)取(index(0)函数)List第一个,判断是否超时

如果没超时:结束本次

如果超时:获取前100个(按业务自定义数量)判断是否超时,如果全部超时再取100判断,如果循环遇到未超时的结束本次,并将超时的从reids删除leftPop()

好处:相对上种来说,避免了全量拉取数据

存在问题:可能出现多次网络交互,每次拉取数量要按照自己业务来调整

5、基于redis--将数据存入map

本地启动一个定时任务(每秒),redis维护一个变量

订单入库时:先取reids里的变量,然后以该变量命名新建map,将数据放入map

每跑一次定时任务变量++,然后拉取该map下的订单,判断超时的对应删除

变量==到达(超时时间秒数)个数时,将变量赋值1

这样就生成了一个收尾相连的环形List<Map>

好处:数据精确到秒级失效,每次从redis拉取的数据最多是2秒内的数据,数据量可空

存在问题:如果超时时间相对较长,Redis里将创建过多的map

三、代码实现

针对第5种实现

1、订单进入时的操作

private static final String ANNULAR_NUM ="ORDER:INDEX_NUM";
public void annularAdd(String orderNo){
    //ANNULAR_NUM,自增变量(如:1-120)
	Object obj = redisTemplate.opsForValue().get(ANNULAR_NUM);
	if (obj!=null){
	    //num 当前的自增变量值(1-120)
		Integer num = Integer.valueOf(obj.toString());
		redisTemplate.opsForHash().put("ORDER:MAP:"+num,
                                        orderNo,
            LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()+"");
	}
}

查询reids自增变量,如果不为空就将订单放入对应的map

2、启动定时任务(每秒)

private static final String ANNULAR_NUM ="ORDER:INDEX_NUM";
public void annularAddSchedule() {
	Object obj = redisTemplate.opsForValue().get(ANNULAR_NUM);
	int num=obj!=null ? Integer.valueOf(obj.toString()) : 0;
	Long increment=1L;
	//如果当前redis自增数值大于等于超时时间秒数,就置为初始值
	if (num >= 120) {
		redisTemplate.opsForValue().set(ANNULAR_NUM,increment+"");
	}else {
		increment = redisTemplate.opsForValue().increment(ANNULAR_NUM);
	}
	System.out.println("当前map"+increment);
	Map map = redisTemplate.opsForHash().entries("ORDER:MAP:" + increment);
	List<String> orders = new ArrayList<>();
	long timeout = LocalDateTime.now().plusSeconds(-119).toInstant(ZoneOffset.of("+8")).toEpochMilli();
	map.forEach((key, value) -> {
		Long aLong = Long.valueOf(value.toString());
		//如果当前时间-超时时间>订单的超时时间,说明订单已超时
		if (timeout > aLong) {
			orders.add(key.toString());
		}
	});
	System.out.println("超时的orders=========" + orders.size());
	if (orders != null && orders.size() > 0) {
		Long delete = redisTemplate.opsForHash().delete("ORDER:MAP:" + increment, orders.toArray());
        //使用线程池处理自己业务逻辑
		executorService.execute(() -> {
			userInterface.getUser(JSON.toJSONString(orders));
		});
		System.out.println("delete=========" + delete);
	}
}

公众号主要记录各种源码、面试题、微服务技术栈,帮忙关注一波,非常感谢

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐