observable.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /**
  2. * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * @author David Sehnal <david.sehnal@gmail.com>
  5. */
  6. import { Task } from '../task'
  7. import { RuntimeContext } from './runtime-context'
  8. import { Progress } from './progress'
  9. import { now } from '../util/now'
  10. import { Scheduler } from '../util/scheduler'
  11. function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
  12. const info = ProgressInfo(task, observer, updateRateMs);
  13. const ctx = new ObservableRuntimeContext(info, info.root);
  14. return execute(task, ctx);
  15. }
  16. namespace ExecuteObservable {
  17. export let PRINT_ERRORS_TO_STD_ERR = false;
  18. }
  19. function defaultProgress(task: Task<any>): Task.Progress {
  20. return {
  21. taskId: task.id,
  22. taskName: task.name,
  23. message: '',
  24. startedTime: 0,
  25. canAbort: true,
  26. isIndeterminate: true,
  27. current: 0,
  28. max: 0
  29. };
  30. }
  31. interface ProgressInfo {
  32. updateRateMs: number,
  33. lastNotified: number,
  34. observer: Progress.Observer,
  35. abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string },
  36. taskId: number;
  37. root: Progress.Node;
  38. tryAbort: (reason?: string) => void;
  39. }
  40. function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo {
  41. const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' };
  42. return {
  43. updateRateMs,
  44. lastNotified: now(),
  45. observer,
  46. abortToken,
  47. taskId: task.id,
  48. root: { progress: defaultProgress(task), children: [] },
  49. tryAbort: createAbortFunction(abortToken)
  50. };
  51. }
  52. function createAbortFunction(token: ProgressInfo['abortToken']) {
  53. return (reason?: string) => {
  54. token.abortRequested = true;
  55. token.reason = reason || token.reason;
  56. };
  57. }
  58. function cloneTree(root: Progress.Node): Progress.Node {
  59. return { progress: { ...root.progress }, children: root.children.map(cloneTree) };
  60. }
  61. function canAbort(root: Progress.Node): boolean {
  62. return root.progress.canAbort && root.children.every(canAbort);
  63. }
  64. function snapshotProgress(info: ProgressInfo): Progress {
  65. return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort };
  66. }
  67. async function execute<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
  68. ctx.node.progress.startedTime = now();
  69. try {
  70. const ret = await task.__f(ctx);
  71. if (ctx.info.abortToken.abortRequested) {
  72. abort(ctx.info, ctx.node);
  73. }
  74. return ret;
  75. } catch (e) {
  76. if (Task.isAbort(e)) {
  77. // wait for all child computations to go thru the abort phase.
  78. if (ctx.node.children.length > 0) {
  79. await new Promise(res => { ctx.onChildrenFinished = res; });
  80. }
  81. if (task.__onAbort) task.__onAbort();
  82. }
  83. if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
  84. throw e;
  85. }
  86. }
  87. function abort(info: ProgressInfo, node: Progress.Node) {
  88. if (!info.abortToken.treeAborted) {
  89. info.abortToken.treeAborted = true;
  90. abortTree(info.root);
  91. notifyObserver(info, now());
  92. }
  93. throw Task.Aborted(info.abortToken.reason);
  94. }
  95. function abortTree(root: Progress.Node) {
  96. const progress = root.progress;
  97. progress.isIndeterminate = true;
  98. progress.canAbort = false;
  99. progress.message = 'Aborting...';
  100. for (const c of root.children) abortTree(c);
  101. }
  102. function shouldNotify(info: ProgressInfo, time: number) {
  103. return time - info.lastNotified > info.updateRateMs;
  104. }
  105. function notifyObserver(info: ProgressInfo, time: number) {
  106. info.lastNotified = time;
  107. const snapshot = snapshotProgress(info);
  108. info.observer(snapshot);
  109. }
  110. class ObservableRuntimeContext implements RuntimeContext {
  111. isSynchronous = false;
  112. isExecuting = true;
  113. lastUpdatedTime = 0;
  114. node: Progress.Node;
  115. info: ProgressInfo;
  116. // used for waiting for cancelled computation trees
  117. onChildrenFinished?: () => void = void 0;
  118. private checkAborted() {
  119. if (this.info.abortToken.abortRequested) {
  120. abort(this.info, this.node);
  121. }
  122. }
  123. get shouldUpdate(): boolean {
  124. this.checkAborted();
  125. return now() - this.lastUpdatedTime > this.info.updateRateMs;
  126. }
  127. private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) {
  128. this.checkAborted();
  129. if (!update) return;
  130. const progress = this.node.progress;
  131. if (typeof update === 'string') {
  132. progress.message = update;
  133. } else {
  134. if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort;
  135. if (typeof update.message !== 'undefined') progress.message = update.message;
  136. if (typeof update.current !== 'undefined') progress.current = update.current;
  137. if (typeof update.max !== 'undefined') progress.max = update.max;
  138. progress.isIndeterminate = typeof progress.current === 'undefined' || typeof progress.max === 'undefined';
  139. if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate;
  140. }
  141. }
  142. update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void {
  143. // The progress tracking and observer notification are separated
  144. // because the computation can have a tree structure.
  145. // All nodes of the tree should be regualarly updated at the specified frequency,
  146. // however, the notification should only be invoked once per the whole tree.
  147. this.lastUpdatedTime = now();
  148. this.updateProgress(progress);
  149. if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return;
  150. notifyObserver(this.info, this.lastUpdatedTime);
  151. // The computation could have been aborted during the notifycation phase.
  152. this.checkAborted();
  153. return Scheduler.immediatePromise();
  154. }
  155. async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> {
  156. this.updateProgress(progress);
  157. // Create a new child context and add it to the progress tree.
  158. // When the child task finishes, remove the tree node.
  159. const node: Progress.Node = { progress: defaultProgress(task), children: [] };
  160. const children = this.node.children as Progress.Node[];
  161. children.push(node);
  162. const ctx = new ObservableRuntimeContext(this.info, node);
  163. try {
  164. return await execute(task, ctx);
  165. } catch (e) {
  166. if (Task.isAbort(e)) {
  167. // need to catch the error here because otherwise
  168. // promises for running child tasks in a tree-like computation
  169. // will get orphaned and cause "uncaught error in Promise".
  170. return void 0 as any;
  171. }
  172. throw e;
  173. } finally {
  174. // remove the progress node after the computation has finished.
  175. const idx = children.indexOf(node);
  176. if (idx >= 0) {
  177. for (let i = idx, _i = children.length - 1; i < _i; i++) {
  178. children[i] = children[i + 1];
  179. }
  180. children.pop();
  181. }
  182. if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished();
  183. }
  184. }
  185. constructor(info: ProgressInfo, node: Progress.Node) {
  186. this.node = node;
  187. this.info = info;
  188. }
  189. }
  190. export { ExecuteObservable }