使用rxjs实现队列

更新时间:2023-03-20 11:34:06标签:web前端rxjs

固定任务数量

1import { interval, merge, take, tap } from 'rxjs';
2
3const count1 = interval(1000).pipe(
4 take(10),
5 tap(v => {
6 // eslint-disable-next-line no-console
7 console.log(`count1:${v}`);
8 }),
9);
10
11const count2 = interval(3000).pipe(
12 take(2),
13 tap(v => {
14 // eslint-disable-next-line no-console
15 console.log(`count2:${v}`);
16 }),
17);
18
19const count3 = interval(3000).pipe(
20 take(5),
21 tap(v => {
22 // eslint-disable-next-line no-console
23 console.log(`count3:${v}`);
24 }),
25);
26
27const count4 = interval(2000).pipe(
28 take(10),
29 tap(v => {
30 // eslint-disable-next-line no-console
31 console.log(`count4:${v}`);
32 }),
33);
34
35const count5 = interval(5000).pipe(
36 take(5),
37 tap(v => {
38 // eslint-disable-next-line no-console
39 console.log(`count5:${v}`);
40 }),
41);
42
43export function startQueue() {
44 // 表示同时执行以上所有observable,但最大队列数为2
45 return merge(count1, count2, count3, count4, count5, 2).subscribe();
46}

点击开始执行startQueue,打开f12查看打印结果

动态任务数量

1import { fromEvent, mergeScan, tap, timer, map } from 'rxjs';
2
3export function dynamicQueue(dom: HTMLElement) {
4 return fromEvent(dom, 'click').pipe(
5 mergeScan((acc, value, index) => {
6 const duration = Math.random() * 10;
7 return timer(duration * 1000).pipe(
8 map(() => acc + 1),
9 tap(() => {
10 // eslint-disable-next-line no-console
11 console.log(`${index + 1}个下载完毕,用时${duration}`);
12 }),
13 );
14 }, 0, 2), // 0为seed,2为最大队列数
15 ).subscribe();
16}

连续点击下载创建多个下载任务,模拟最大并发下载为2的场景

模拟下载场景

模拟最大并发数量为3的下载队列

点击左侧按钮添加任务

    源码

    1import { Button } from 'antd';
    2import { combineClass, randomStr } from '@shared/utils';
    3import { FC, useCallback, useEffect, useMemo, useRef, useState } from 'react';
    4import { Subject, interval, tap, mergeMap, of, concat, takeWhile, Observable, scan } from 'rxjs';
    5import styles from './downloader.module.scss';
    6
    7// 模拟下载过程
    8const virtualDownload = () => {
    9 const speed = Math.random() * 0.3;
    10 return interval(1000).pipe(
    11 scan((acc) => Math.min(1, acc + speed), 0),
    12 mergeMap((percent) => {
    13 if (percent >= 1) {
    14 return concat(of(percent), of(null));
    15 }
    16 return of(percent);
    17 }),
    18 takeWhile(v => typeof v === 'number'),
    19 ) as Observable<number>;
    20};
    21
    22interface VirtualFile {
    23 name: string;
    24 percent: number;
    25 status: 'done' | 'waiting' | 'pending';
    26}
    27
    28const Downloader: FC = () => {
    29
    30 const [files, setFiles] = useState<VirtualFile[]>([]);
    31
    32 const downloadTrigger = useRef(useMemo(() => {
    33 return new Subject<string>();
    34 }, []));
    35
    36 const onClick = useCallback(() => {
    37 const filename = `${randomStr(files.length + 1)}.zip`; // 随机模拟一个文件名
    38 downloadTrigger.current.next(filename);
    39 }, [files.length]);
    40
    41 useEffect(() => {
    42 const subscription = downloadTrigger.current.pipe(
    43 mergeMap(name => {
    44 const newFile: VirtualFile = {
    45 name,
    46 percent: 0,
    47 status: 'waiting',
    48 };
    49 setFiles(prev => {
    50 const next = [...prev];
    51 next.push(newFile);
    52 return next;
    53 });
    54 return of(newFile);
    55 }),
    56 mergeMap((value) => {
    57 return virtualDownload().pipe(
    58 tap((percent) => {
    59 value.percent = percent;
    60 value.status = percent < 1 ? 'pending' : 'done';
    61 setFiles(prev => [...prev]);
    62 }),
    63 );
    64 }, 3), // 3为最大并发数
    65 ).subscribe();
    66 return () => {
    67 subscription.unsubscribe();
    68 };
    69 }, []);
    70
    71 return (
    72 <div>
    73 <div>
    74 <Button onClick={onClick}>添加下载任务</Button>
    75 <span>点击左侧按钮添加任务</span>
    76 </div>
    77 <ul className={styles.list}>
    78 {
    79 files.map(v => {
    80 return (
    81 <li key={v.name}>
    82 <span>{v.name}</span>
    83 <span className={styles.percent}>{(v.percent * 100).toFixed(2)}%</span>
    84 <span
    85 className={combineClass(styles[v.status])}
    86 >{v.status === 'waiting' ? '等待中' : v.status === 'pending' ? '下载中' : '已完成'}</span>
    87 </li>
    88 );
    89 })
    90 }
    91 </ul>
    92 </div>
    93 );
    94};
    95
    96export { Downloader };

    Promise版本

    点击左侧按钮添加任务

      源码

      1interface Task<T>{
      2 (): Promise<T>;
      3}
      4
      5export class Queue<T=any>{
      6
      7 private concurrent: number;
      8
      9 private tasks: Task<T>[] = [];
      10
      11 private pending = new Set<ReturnType<Task<T>>>();
      12
      13 private status: 0 | 1 = 1;
      14
      15 constructor(concurrent = 3) {
      16 this.concurrent = concurrent;
      17 }
      18
      19 private runNext() {
      20 if (this.status === 1 && this.size() > 0 && this.concurrent > this.pending.size) {
      21 const task = this.dequeue()!();
      22 task.finally(() => {
      23 this.pending.delete(task);
      24 this.runNext();
      25 });
      26 this.pending.add(task);
      27 this.runNext();
      28 return task;
      29 }
      30 }
      31
      32 public async run(tasks: Task<T>[]) {
      33 const data = tasks.map((task) => {
      34 let resolved: (result: T) => void;
      35 const promise = new Promise<T>((resolve) => {
      36 resolved = resolve;
      37 });
      38 return {
      39 task: async () => {
      40 const ret = await task();
      41 resolved(ret);
      42 return ret;
      43 },
      44 promise,
      45 };
      46 });
      47 this.tasks.push(
      48 ...data.map((v) => v.task),
      49 );
      50 this.runNext();
      51 return await Promise.all(data.map((v) => v.promise));
      52 }
      53
      54 public setConcurrent(concurrent: number) {
      55 this.concurrent = concurrent;
      56 this.runNext();
      57 }
      58
      59 public enqueue(...tasks: Task<T>[]) {
      60 this.tasks.push(...tasks);
      61 this.runNext();
      62 }
      63
      64 public dequeue() {
      65 return this.tasks.shift();
      66 }
      67
      68 public start() {
      69 this.status = 1;
      70 this.runNext();
      71 }
      72
      73 public stop() {
      74 this.status = 0;
      75 }
      76
      77 public clear() {
      78 return this.tasks.splice(0, this.tasks.length);
      79 }
      80
      81 public size() {
      82 return this.tasks.length;
      83 }
      84
      85}