# 用 js 实现并发控制
类似的 js 库 eventproxy, async
/**
* @params list {Array} - 要迭代的数组
* @params limit {Number} - 并发数量控制数
* @params asyncHandle {Function} - 对`list`的每一个项的处理函数,参数为当前处理项,必须 return 一个Promise来确定是否继续进行迭代
* @return {Promise} - 返回一个 Promise 值来确认所有数据是否迭代完成
*/
let mapLimit = (list, limit, asyncHandle) => {
let recursion = (arr) => {
return asyncHandle(arr.shift())
.then(()=>{
if (arr.length !== 0) return recursion(arr) // 数组还未迭代完,递归(注意这里 return 的实际是一个继续执行的 promise)继续进行迭代
else return 'finish';
})
};
let listCopy = [].concat(list);
let asyncList = []; // 正在进行的所有并发异步操作
while(limit--) {
asyncList.push(recursion(listCopy));
}
return Promise.all(asyncList); // 所有并发异步操作都完成后,本次并发控制迭代完成
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
测试异步并发情况:
var dataLists = [1,2,3,4,5,6,7,8,9,11,100,123];
var count = 0;
mapLimit(dataLists, 3, (curItem)=>{
return new Promise(resolve => {
count++
setTimeout(()=>{
console.log(curItem, '当前并发量:', count--)
resolve();
}, Math.random() * 5000)
});
}).then(response => {
console.log('finish', response) // ["finish", "finish", "finish"] 这里感觉设计的不科学
})
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
测试手动抛出异常中断并发函数:
var dataLists = [1,2,3,4,5,6,7,8,9,11,100,123];
var count = 0;
mapLimit(dataLists, 3, (curItem)=>{
return new Promise((resolve, reject) => {
count++
setTimeout(()=>{
console.log(curItem, '当前并发量:', count--)
if(curItem > 4) reject('error happen')
resolve();
}, Math.random() * 5000)
});
}).then(response => {
console.log('finish', response)
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
感觉上边的实现还不是很好,因为最终无法拿到完整的顺序的结果
# 上边的优化
这个实现就类似 async#mapLimit 的实现
function sendRequest(list = [], max = 5, handler, callback) {
if (!handler) {
return new Error('handler 必传')
}
let result = [] // 最终结果
//将urls根据max做分组,一共分为times组,每个组里面最多包含max个url
let listObj = {};
let times = Math.ceil(list.length / max);
let currentIndex = 0;
for (let i = 0; i < times; i++) {
listObj[i] = list.slice(i * max, (i + 1) * max);
}
let send = () => {
if (listObj[currentIndex]) {
Promise.all(listObj[currentIndex].map((item, i) => handler(item, i + currentIndex * max)))
.then((res) => {
console.log(res, `第${currentIndex}批请求成功`);
result = [...result, ...res]
currentIndex++;
if (times === currentIndex) {
callback(result);
}
//此处可加个定时器,模拟请求接口耗时
//递归调用,直到 !listObj[currentIndex] 为止
send();
}).catch((err) => {
console.log(err);
})
}
};
send();
}
let urls = [
'aaa',
'bbb',
'ccc',
'ddd',
'eee'
];
let max = 4;
let callback = (result) => {
console.log('全部请求完成');
console.log(result)
};
let request = function (url, i) {
return new Promise(resolve => {
setTimeout(() => {
resolve('url ===' + url + '===' + i)
}, Math.floor(Math.random() * 1000 * 5))
})
}
sendRequest(urls, max, request, callback);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 一道面试题
class Scheduler {
constructor() {
this.count = 0
this.waitFn = []
}
add(promiseCreator) {
return new Promise((resolve, reject) => {
if (this.count < 2) {
// exec
this.count++
promiseCreator().then(data => {
console.log(data)
this.count--
if (this.waitFn.length > 0) {
this.add(this.waitFn.shift())
}
resolve(data)
})
} else {
this.waitFn.push(promiseCreator)
}
})
}
// add(promiseCreator) {
// if (this.count < 2) {
// // exec
// this.count++
// promiseCreator().then(data => {
// console.log(data)
// this.count--
// if (this.waitFn.length > 0) {
// this.add(this.waitFn.shift())
// }
// return data
// })
// } else {
// this.waitFn.push(promiseCreator)
// }
// }
}
const timeout = time => {
return new Promise(resolve => {
return setTimeout(resolve, time)
})
}
const scheduler = new Scheduler()
const addTask = (time, order) => {
scheduler.add(() =>
timeout(time).then(() => {
console.log(order, ':', Date.now())
return order
})
)
}
addTask(1000, 1)
addTask(200, 2)
addTask(300, 3)
addTask(700, 4)
// 2 ":" 1559717284359
// 3 ":" 1559717284458
// 4 ":" 1559717285060
// 1 ":" 1559717285156
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class Scheduler {
constructor(){
this.doing = 0;
this.queue = [];
}
add(promiseCreator) {
this.queue.push(promiseCreator);
this.dotask();
}
dotask(){
if(this.doing < 2){
if(this.queue.length !== 0){
this.doing += 1;
let item = this.queue.shift();
item().then(()=>{
this.doing -= 1;
this.dotask();
})
}
}
}
}
const timeout = time => {
return new Promise(resolve => {
setTimeout(resolve, time)
})
};
const scheduler = new Scheduler();
const addTask = (time, order) => {
scheduler.add(() => timeout(time)
.then(() => console.log(order, ':', Date.now()))
)
}
console.log(Date.now())
addTask(1000, 1)
addTask(200, 2)
addTask(300, 3)
addTask(700, 4)
// 2 ":" 1559722511192
// 3 ":" 1559722511495
// 1 ":" 1559722511995
// 4 ":" 1559722512200
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 补充
// async-pool并发请求控制
// 保证顺序和全部结果
function createRequest (tasks, pool = 5) {
let results = []
let together = new Array(pool).fill(null)
let index = 0
together = together.map(() => {
return new Promise((resolve, reject) => {
const run = function run () {
if (index >= tasks.length) {
resolve()
return
}
let old_index = index
let task = tasks[index++]
task().then(result => {
results[old_index] = result
run()
}).catch(reason => {
reject(reason)
})
}
run()
})
})
return Promise.all(together).then(() => {
return results
})
}
createRequest(tasks, 2).then(results => {
console.log('都成功认为成功,拿到完整请求结果且符合传入任务顺序', results)
}).catch(reason => {
console.log('只要有一个失败就认为失败', reason)
})
// 不保证全部成功
function createRequest (tasks, pool, callback) {
class TaskQuene {
running = 0
queue = []
results = []
pushTask (task) {
let self = this
self.queue.push(task)
self.next()
}
next () {
let self = this
while (self.running < pool && self.queue.length) {
self.running++
let task = self.queue.shift()
task().then(result => {
self.results.push(result)
}).finally(() => {
self.running--
self.next()
})
}
if (self.running === 0) {
callback(results)
}
}
}
let TQ = new TaskQueue
tasks.forEach(task => TQ.pushTask(task))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69