使用rxjs实现队列
更新时间:2023-03-20 11:34:06标签:web前端rxjs
固定任务数量
1import { interval, merge, take, tap } from 'rxjs';23const count1 = interval(1000).pipe(4 take(10),5 tap(v => {6 // eslint-disable-next-line no-console7 console.log(`count1:${v}`);8 }),9);1011const count2 = interval(3000).pipe(12 take(2),13 tap(v => {14 // eslint-disable-next-line no-console15 console.log(`count2:${v}`);16 }),17);1819const count3 = interval(3000).pipe(20 take(5),21 tap(v => {22 // eslint-disable-next-line no-console23 console.log(`count3:${v}`);24 }),25);2627const count4 = interval(2000).pipe(28 take(10),29 tap(v => {30 // eslint-disable-next-line no-console31 console.log(`count4:${v}`);32 }),33);3435const count5 = interval(5000).pipe(36 take(5),37 tap(v => {38 // eslint-disable-next-line no-console39 console.log(`count5:${v}`);40 }),41);4243export function startQueue() {44 // 表示同时执行以上所有observable,但最大队列数为245 return merge(count1, count2, count3, count4, count5, 2).subscribe();46}
点击开始
执行startQueue,打开f12查看打印结果
动态任务数量
1import { fromEvent, mergeScan, tap, timer, map } from 'rxjs';23export function dynamicQueue(dom: HTMLElement) {4 return fromEvent(dom, 'click').pipe(5 mergeScan((acc, value, index) => {6 const duration = Math.random() * 10;7 return timer(duration * 1000).pipe(8 map(() => acc + 1),9 tap(() => {10 // eslint-disable-next-line no-console11 console.log(`第${index + 1}个下载完毕,用时${duration}秒`);12 }),13 );14 }, 0, 2), // 0为seed,2为最大队列数15 ).subscribe();16}
连续点击下载
创建多个下载任务,模拟最大并发下载为2的场景
模拟下载场景
模拟最大并发数量为3的下载队列
点击左侧按钮添加任务
源码
1import { Button } from 'antd';2import { combineClass, randomStr } from '@shared/utils';3import { FC, useCallback, useEffect, useMemo, useRef, useState } from 'react';4import { Subject, interval, tap, mergeMap, of, concat, takeWhile, Observable, scan } from 'rxjs';5import styles from './downloader.module.scss';67// 模拟下载过程8const virtualDownload = () => {9 const speed = Math.random() * 0.3;10 return interval(1000).pipe(11 scan((acc) => Math.min(1, acc + speed), 0),12 mergeMap((percent) => {13 if (percent >= 1) {14 return concat(of(percent), of(null));15 }16 return of(percent);17 }),18 takeWhile(v => typeof v === 'number'),19 ) as Observable<number>;20};2122interface VirtualFile {23 name: string;24 percent: number;25 status: 'done' | 'waiting' | 'pending';26}2728const Downloader: FC = () => {2930 const [files, setFiles] = useState<VirtualFile[]>([]);3132 const downloadTrigger = useRef(useMemo(() => {33 return new Subject<string>();34 }, []));3536 const onClick = useCallback(() => {37 const filename = `${randomStr(files.length + 1)}.zip`; // 随机模拟一个文件名38 downloadTrigger.current.next(filename);39 }, [files.length]);4041 useEffect(() => {42 const subscription = downloadTrigger.current.pipe(43 mergeMap(name => {44 const newFile: VirtualFile = {45 name,46 percent: 0,47 status: 'waiting',48 };49 setFiles(prev => {50 const next = [...prev];51 next.push(newFile);52 return next;53 });54 return of(newFile);55 }),56 mergeMap((value) => {57 return virtualDownload().pipe(58 tap((percent) => {59 value.percent = percent;60 value.status = percent < 1 ? 'pending' : 'done';61 setFiles(prev => [...prev]);62 }),63 );64 }, 3), // 3为最大并发数65 ).subscribe();66 return () => {67 subscription.unsubscribe();68 };69 }, []);7071 return (72 <div>73 <div>74 <Button onClick={onClick}>添加下载任务</Button>75 <span>点击左侧按钮添加任务</span>76 </div>77 <ul className={styles.list}>78 {79 files.map(v => {80 return (81 <li key={v.name}>82 <span>{v.name}</span>83 <span className={styles.percent}>{(v.percent * 100).toFixed(2)}%</span>84 <span85 className={combineClass(styles[v.status])}86 >{v.status === 'waiting' ? '等待中' : v.status === 'pending' ? '下载中' : '已完成'}</span>87 </li>88 );89 })90 }91 </ul>92 </div>93 );94};9596export { Downloader };
Promise版本
点击左侧按钮添加任务
源码
1interface Task<T>{2 (): Promise<T>;3}45export class Queue<T=any>{6 private concurrent: number;7 private tasks: Task<T>[] = [];8 private pending = new Set<ReturnType<Task<T>>>();9 private status: 0 | 1 = 1;10 constructor(concurrent = 3) {11 this.concurrent = concurrent;12 }13 private check() {14 if (this.status === 1) {15 const count = this.concurrent - this.pending.size;16 this.tasks.splice(0, count).forEach(fn => {17 const ret = fn();18 ret.finally(() => {19 this.pending.delete(ret);20 this.check();21 });22 this.pending.add(ret);23 });24 }25 }26 public setConcurrent(concurrent: number) {27 this.concurrent = concurrent;28 this.check();29 }30 public addTask(...tasks: Task<T>[]) {31 this.tasks.push(...tasks);32 this.check();33 }34 public start() {35 this.status = 1;36 this.check();37 }38 public stop() {39 this.status = 0;40 }41}