最初开发目的是做一个axios请求并发的控制功能。后来发现其他场景可能也有类似需要控制并发的需求,于是将此功能其抽离成公共的类,在需要的场景使用。

实现步骤

思考1:如何让一个事件处于等待执行的状态?立马想到的就是 Promise、async、await

思考2:需要记录哪些状态来实现?

        ①最大并发数量、②当前进行中的任务数量、③存储等待中的任务

思考3:每个事件执行前都要过安检 (自定义的interceptors方法),放行还是拦截,这是个问题!

思考4:放行下一个被拦截的任务。

结合1234:利用async、await和new Promise的resolve方法进行事件的执行控制。

        如果 最大并发数量>当前进行中的任务数量,则调用resolve方法,结束await等待。

        否则 将resolve方法存储到数组中,等待有任务完成后再从数组中提取并执行第一个resolve。

        直至 存储任务的数组为空。

并发拦截的类诞生

// 并发拦截
class ConcurrentInterceptors {
    constructor(maxProgressCount) {
        // 最大并发数量
        this.maxProgressCount = maxProgressCount;
        // 进行中的任务数量
        this.inProgressCount = 0;
        // 等待中的任务
        this.waitingArr = [];
    }

    /**
     * 拦截器
     * @return Promise.resolve()
     */
    interceptors = () => {
        return new Promise(resolve => {
            const { maxProgressCount, inProgressCount } = this
            // 非数字、isNaN、不大于0 不拦截
            if (typeof maxProgressCount !== 'number' || isNaN(maxProgressCount) || maxProgressCount <= 0) {
                resolve()
                return
            }
            // 当前进行的任务数量是否大于最大并发数量
            if (inProgressCount < maxProgressCount) { 
                // 未超出数量
                this.inProgressCount += 1
                resolve()
            }else {
                // 超出数量
                this.waitingArr.push(resolve);
            }
        });
    }

    /**
     * 启动下一个被拦截的任务
     */
    next = () => {
        // 当前进行中的数量减一
        this.inProgressCount -= 1

        // 等待发送的请求的移除一个
        const nextResolve = this.waitingArr.shift();
        if(nextResolve){
            // 要进行下一个任务了 当前进行中的数量加一
            this.inProgressCount += 1
            nextResolve();
        }
    }

    /**
     * 更新最大并发数量
     * @param newMaxProgressCount
     */
    upMaxProgressCount = (newMaxProgressCount)=>{
        this.maxProgressCount = newMaxProgressCount;
    }
}

export default ConcurrentInterceptors

如何使用?示例①:在axios中使用

import axios from 'axios';
// 引入并发拦截器
import ConcurrentInterceptors from './concurrentInterceptors';
// 创建并发实例对象,并设置最大并发数量:2
const concurrent = new ConcurrentInterceptors(2)

// 创建axios实例对象
const instance = axios.create({});

// axios自身的请求拦截器
instance.interceptors.request.use(
    async config => {
        /**
         * 关键代码 
         * @description 并发拦截器启动 进行数量检测
         */
        await concurrent.interceptors()

        return config;
    },
    error => Promise.reject(error)
);

// axios自身的响应拦截器
instance.interceptors.response.use(
    response => {
        /**
         * 关键代码 
         * @description 通知并发拦截器有任务完成了 请放行下一个任务!
         */
        concurrent.next()

        return response;
    },
    error => {
        /**
         * 关键代码 
         * @description 通知并发拦截器有任务完成了 请放行下一个任务!
         */
        concurrent.next()
        
        return Promise.reject(error);
    }
);

以上并发的限制功能已经实现,可根据自己需求进行修改或直接使用。或者继续往下阅读。

结束?

还没!有新的问题:进行中的任务数量等待中的任务数据仅供内部方法使用,不能在类的外部修改。需要将类的属性私有化处理。于是有了优化版,修改为使用 WeakMap 的Class私有属性版本。使用方法同 示例① axios示例。

私有属性版本 (优化版)

const privateWeakMap = new WeakMap();
function getConcurrent(_this) {
    return privateWeakMap.get(_this)
}
class ConcurrentInterceptors {
    /**
     * 最大并发数量
     * @param maxProgressCount
     * @typeof number
     */
    constructor(maxProgressCount) {
        privateWeakMap.set(this,
            {
                // 最大并发数量
                _maxProgressCount: maxProgressCount,
                // 进行中的任务数量
                _inProgressCount: 0,
                // 等待中的任务
                _waitingArr: [],
            }
        );
    }

    /**
     * 拦截器
     * @return Promise.resolve()
     */
    interceptors = () => {
        const { _inProgressCount, _maxProgressCount } = getConcurrent(this)
        return new Promise(resolve => {
            // 非数字、isNaN、不大于0 不拦截
            if (typeof _maxProgressCount !== 'number' || isNaN(_maxProgressCount) || _maxProgressCount <= 0) {
                resolve()
                return
            }
            // 当前进行的任务数量是否大于最大并发数量
            if (_inProgressCount < _maxProgressCount) {
                // 未超出最大并发数量
                getConcurrent(this)._inProgressCount += 1
                resolve()
            } else {
                // 超出最大并发数量
                getConcurrent(this)._waitingArr.push(resolve);
            }
        });
    }

    /**
     * 启动下一个被拦截的任务
     */
    next = () => {
        // 当前进行中的数量减一
        getConcurrent(this)._inProgressCount -= 1
        
        // 等待发送的请求的移除一个
        const nextResolve = getConcurrent(this)._waitingArr.shift();
        if (nextResolve) {
            // 要进行下一个任务了 当前进行中的数量加一
            getConcurrent(this)._inProgressCount += 1
            nextResolve();
        }
    }

    /**
     * 更新最大并发数量
     * @param newMaxProgressCount
     */
    updateMaxProgressCount = (newMaxProgressCount) => {
        getConcurrent(this)._maxProgressCount = newMaxProgressCount;
    }

    /**
     * 获取_maxProgressCount属性值
     */
    get maxProgressCount() {
        return getConcurrent(this)._maxProgressCount
    }
    set maxProgressCount(newVal) {
        this.updateMaxProgressCount(newVal)
    }

    /**
     * 获取_sendAxiosIngCount属性值
     */
    get inProgressCount() {
        return getConcurrent(this)._inProgressCount
    }
    set inProgressCount(newVal) {
        throw new Error(`inProgressCount为只读属性`);
    }

    /**
     * 获取_awaitSendAxiosArr属性值
     */
    get waitingArr() {
        return getConcurrent(this)._waitingArr
    }
    set waitingArr(newVal) {
        throw new Error(`waitingArr为只读属性`);
    }
}

export default ConcurrentInterceptors

最后

1.还是不清楚如用使用?

将 ConcurrentInterceptors 类(第一个或者私有属性的都行)复制到 js文件,在需要的文件引入,并初始化实例对象

① 引入并发拦截器
import ConcurrentInterceptors from './concurrentInterceptors.js';
② 创建并发实例对象,并设置最大并发数量: num
const concurrent = new ConcurrentInterceptors(num)

如果不是axios则需要一个类似于axios公共拦截器一样的方法,并在此方法里调用 (见示例②)

③ 并发拦截器启动 进行数量检测
await concurrent.interceptors()

在每个事件执行完的方法 (见示例②)

④ 通知并发拦截器有任务完成了 执行下一个任务
concurrent.next() 

示例②

import ConcurrentInterceptors from './concurrentInterceptors.js';

// 最大并发数量设置为2
const concurrent = new ConcurrentInterceptors(2)

function fn1() {
  // Do something
  console.log('函数fn1');

  // fn1执行完了,可以进行下一个任务了
  concurrent.next()
}

function fn2() {
  // Do something
  console.log('函数fn2');

  // fn2执行完了,可以进行下一个任务了
  concurrent.next()
}

async function publicFn(type) {
  // 并发拦截器启动 进行数量检测
  await concurrent.interceptors();

  // 等待执行
  switch (type) {
    case 'fn1':
      fn1()
      break;
    case 'fn2':
      fn2()
      break;
  }
}

2.只能axios使用吗?

不是。任何需要控制并发的场景都可使用

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐