async-queue.ts 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. /**
  2. * Copyright (c) 2019 mol* contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * @author David Sehnal <david.sehnal@gmail.com>
  5. */
  6. import { arrayRemoveInPlace } from './array';
  7. import { Subject } from 'rxjs';
  8. export class AsyncQueue<T> {
  9. private queue: T[] = [];
  10. private signal = new Subject<{ v: T, stillPresent: boolean }>();
  11. get length() { return this.queue.length }
  12. enqueue(v: T) {
  13. this.queue.push(v);
  14. if (this.queue.length === 1) return true;
  15. return this.waitFor(v);
  16. }
  17. handled(v: T) {
  18. arrayRemoveInPlace(this.queue, v);
  19. if (this.queue.length > 0) {
  20. this.signal.next({ v: this.queue[0], stillPresent: true });
  21. }
  22. }
  23. remove(v: T) {
  24. const rem = arrayRemoveInPlace(this.queue, v);
  25. if (rem)
  26. this.signal.next({ v, stillPresent: false })
  27. return rem;
  28. }
  29. private waitFor(t: T): Promise<boolean> {
  30. return new Promise(res => {
  31. const sub = this.signal.subscribe(({ v, stillPresent: removed }) => {
  32. if (v === t) {
  33. sub.unsubscribe();
  34. res(removed);
  35. }
  36. });
  37. })
  38. }
  39. }