使用rxjs实现队列

更新时间:2023-03-20 11:34:06标签:web前端rxjs

固定任务数量

1import { interval, merge, take, tap } from 'rxjs';
2
3const count1 = interval(1000).pipe(
4 take(10),
5 tap(v => {
6 // eslint-disable-next-line no-console
7 console.log(`count1:${v}`);
8 }),
9);
10
11const count2 = interval(3000).pipe(
12 take(2),
13 tap(v => {
14 // eslint-disable-next-line no-console
15 console.log(`count2:${v}`);
16 }),
17);
18
19const count3 = interval(3000).pipe(
20 take(5),
21 tap(v => {
22 // eslint-disable-next-line no-console
23 console.log(`count3:${v}`);
24 }),
25);
26
27const count4 = interval(2000).pipe(
28 take(10),
29 tap(v => {
30 // eslint-disable-next-line no-console
31 console.log(`count4:${v}`);
32 }),
33);
34
35const count5 = interval(5000).pipe(
36 take(5),
37 tap(v => {
38 // eslint-disable-next-line no-console
39 console.log(`count5:${v}`);
40 }),
41);
42
43export function startQueue() {
44 // 表示同时执行以上所有observable,但最大队列数为2
45 return merge(count1, count2, count3, count4, count5, 2).subscribe();
46}

点击开始执行startQueue,打开f12查看打印结果

动态任务数量

1import { fromEvent, mergeScan, tap, timer, map } from 'rxjs';
2
3export function dynamicQueue(dom: HTMLElement) {
4 return fromEvent(dom, 'click').pipe(
5 mergeScan((acc, value, index) => {
6 const duration = Math.random() * 10;
7 return timer(duration * 1000).pipe(
8 map(() => acc + 1),
9 tap(() => {
10 // eslint-disable-next-line no-console
11 console.log(`${index + 1}个下载完毕,用时${duration}`);
12 }),
13 );
14 }, 0, 2), // 0为seed,2为最大队列数
15 ).subscribe();
16}

连续点击下载创建多个下载任务,模拟最大并发下载为2的场景

模拟下载场景

模拟最大并发数量为3的下载队列

点击左侧按钮添加任务

    源码

    1import { Button } from 'antd';
    2import { combineClass, randomStr } from '@shared/utils';
    3import { FC, useCallback, useEffect, useMemo, useRef, useState } from 'react';
    4import { Subject, interval, tap, mergeMap, of, concat, takeWhile, Observable, scan } from 'rxjs';
    5import styles from './downloader.module.scss';
    6
    7// 模拟下载过程
    8const virtualDownload = () => {
    9 const speed = Math.random() * 0.3;
    10 return interval(1000).pipe(
    11 scan((acc) => Math.min(1, acc + speed), 0),
    12 mergeMap((percent) => {
    13 if (percent >= 1) {
    14 return concat(of(percent), of(null));
    15 }
    16 return of(percent);
    17 }),
    18 takeWhile(v => typeof v === 'number'),
    19 ) as Observable<number>;
    20};
    21
    22interface VirtualFile {
    23 name: string;
    24 percent: number;
    25 status: 'done' | 'waiting' | 'pending';
    26}
    27
    28const Downloader: FC = () => {
    29
    30 const [files, setFiles] = useState<VirtualFile[]>([]);
    31
    32 const downloadTrigger = useRef(useMemo(() => {
    33 return new Subject<string>();
    34 }, []));
    35
    36 const onClick = useCallback(() => {
    37 const filename = `${randomStr(files.length + 1)}.zip`; // 随机模拟一个文件名
    38 downloadTrigger.current.next(filename);
    39 }, [files.length]);
    40
    41 useEffect(() => {
    42 const subscription = downloadTrigger.current.pipe(
    43 mergeMap(name => {
    44 const newFile: VirtualFile = {
    45 name,
    46 percent: 0,
    47 status: 'waiting',
    48 };
    49 setFiles(prev => {
    50 const next = [...prev];
    51 next.push(newFile);
    52 return next;
    53 });
    54 return of(newFile);
    55 }),
    56 mergeMap((value) => {
    57 return virtualDownload().pipe(
    58 tap((percent) => {
    59 value.percent = percent;
    60 value.status = percent < 1 ? 'pending' : 'done';
    61 setFiles(prev => [...prev]);
    62 }),
    63 );
    64 }, 3), // 3为最大并发数
    65 ).subscribe();
    66 return () => {
    67 subscription.unsubscribe();
    68 };
    69 }, []);
    70
    71 return (
    72 <div>
    73 <div>
    74 <Button onClick={onClick}>添加下载任务</Button>
    75 <span>点击左侧按钮添加任务</span>
    76 </div>
    77 <ul className={styles.list}>
    78 {
    79 files.map(v => {
    80 return (
    81 <li key={v.name}>
    82 <span>{v.name}</span>
    83 <span className={styles.percent}>{(v.percent * 100).toFixed(2)}%</span>
    84 <span
    85 className={combineClass(styles[v.status])}
    86 >{v.status === 'waiting' ? '等待中' : v.status === 'pending' ? '下载中' : '已完成'}</span>
    87 </li>
    88 );
    89 })
    90 }
    91 </ul>
    92 </div>
    93 );
    94};
    95
    96export { Downloader };

    Promise版本

    点击左侧按钮添加任务

      源码

      1interface Task<T>{
      2 (): Promise<T>;
      3}
      4
      5export class Queue<T=any>{
      6 private concurrent: number;
      7 private tasks: Task<T>[] = [];
      8 private pending = new Set<ReturnType<Task<T>>>();
      9 private status: 0 | 1 = 1;
      10 constructor(concurrent = 3) {
      11 this.concurrent = concurrent;
      12 }
      13 private check() {
      14 if (this.status === 1) {
      15 const count = this.concurrent - this.pending.size;
      16 this.tasks.splice(0, count).forEach(fn => {
      17 const ret = fn();
      18 ret.finally(() => {
      19 this.pending.delete(ret);
      20 this.check();
      21 });
      22 this.pending.add(ret);
      23 });
      24 }
      25 }
      26 public setConcurrent(concurrent: number) {
      27 this.concurrent = concurrent;
      28 this.check();
      29 }
      30 public addTask(...tasks: Task<T>[]) {
      31 this.tasks.push(...tasks);
      32 this.check();
      33 }
      34 public start() {
      35 this.status = 1;
      36 this.check();
      37 }
      38 public stop() {
      39 this.status = 0;
      40 }
      41}