observable.ts 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /**
  2. * Copyright (c) 2017-2018 mol* contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * @author David Sehnal <david.sehnal@gmail.com>
  5. */
  6. import { Task } from '../task'
  7. import { RuntimeContext } from './runtime-context'
  8. import { Progress } from './progress'
  9. import { now } from '../util/now'
  10. import { Scheduler } from '../util/scheduler'
  11. import { UserTiming } from '../util/user-timing'
  12. interface ExposedTask<T> extends Task<T> {
  13. f: (ctx: RuntimeContext) => Promise<T>,
  14. onAbort?: () => void
  15. }
  16. export function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
  17. const info = ProgressInfo(task, observer, updateRateMs);
  18. const ctx = new ObservableRuntimeContext(info, info.root);
  19. return execute(task as ExposedTask<T>, ctx);
  20. }
  21. export function ExecuteInContext<T>(ctx: RuntimeContext, task: Task<T>) {
  22. return execute(task as ExposedTask<T>, ctx as ObservableRuntimeContext);
  23. }
  24. export function ExecuteObservableChild<T>(ctx: RuntimeContext, task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>) {
  25. return (ctx as ObservableRuntimeContext).runChild(task, progress);
  26. }
  27. export namespace ExecuteObservable {
  28. export let PRINT_ERRORS_TO_STD_ERR = false;
  29. }
  30. function defaultProgress(task: Task<any>): Task.Progress {
  31. return {
  32. taskId: task.id,
  33. taskName: task.name,
  34. message: '',
  35. startedTime: 0,
  36. canAbort: true,
  37. isIndeterminate: true,
  38. current: 0,
  39. max: 0
  40. };
  41. }
  42. interface ProgressInfo {
  43. updateRateMs: number,
  44. lastNotified: number,
  45. observer: Progress.Observer,
  46. abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string },
  47. taskId: number;
  48. root: Progress.Node;
  49. tryAbort: (reason?: string) => void;
  50. }
  51. function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo {
  52. const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' };
  53. return {
  54. updateRateMs,
  55. lastNotified: now(),
  56. observer,
  57. abortToken,
  58. taskId: task.id,
  59. root: { progress: defaultProgress(task), children: [] },
  60. tryAbort: createAbortFunction(abortToken)
  61. };
  62. }
  63. function createAbortFunction(token: ProgressInfo['abortToken']) {
  64. return (reason?: string) => {
  65. token.abortRequested = true;
  66. token.reason = reason || token.reason;
  67. };
  68. }
  69. function cloneTree(root: Progress.Node): Progress.Node {
  70. return { progress: { ...root.progress }, children: root.children.map(cloneTree) };
  71. }
  72. function canAbort(root: Progress.Node): boolean {
  73. return root.progress.canAbort && root.children.every(canAbort);
  74. }
  75. function snapshotProgress(info: ProgressInfo): Progress {
  76. return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort };
  77. }
  78. async function execute<T>(task: ExposedTask<T>, ctx: ObservableRuntimeContext) {
  79. UserTiming.markStart(task)
  80. ctx.node.progress.startedTime = now();
  81. try {
  82. const ret = await task.f(ctx);
  83. UserTiming.markEnd(task)
  84. UserTiming.measure(task)
  85. if (ctx.info.abortToken.abortRequested) {
  86. abort(ctx.info, ctx.node);
  87. }
  88. return ret;
  89. } catch (e) {
  90. if (Task.isAbort(e)) {
  91. // wait for all child computations to go thru the abort phase.
  92. if (ctx.node.children.length > 0) {
  93. await new Promise(res => { ctx.onChildrenFinished = res; });
  94. }
  95. if (task.onAbort) task.onAbort();
  96. }
  97. if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
  98. throw e;
  99. }
  100. }
  101. function abort(info: ProgressInfo, node: Progress.Node) {
  102. if (!info.abortToken.treeAborted) {
  103. info.abortToken.treeAborted = true;
  104. abortTree(info.root);
  105. notifyObserver(info, now());
  106. }
  107. throw Task.Aborted(info.abortToken.reason);
  108. }
  109. function abortTree(root: Progress.Node) {
  110. const progress = root.progress;
  111. progress.isIndeterminate = true;
  112. progress.canAbort = false;
  113. progress.message = 'Aborting...';
  114. for (const c of root.children) abortTree(c);
  115. }
  116. function shouldNotify(info: ProgressInfo, time: number) {
  117. return time - info.lastNotified > info.updateRateMs;
  118. }
  119. function notifyObserver(info: ProgressInfo, time: number) {
  120. info.lastNotified = time;
  121. const snapshot = snapshotProgress(info);
  122. info.observer(snapshot);
  123. }
  124. class ObservableRuntimeContext implements RuntimeContext {
  125. isSynchronous = false;
  126. isExecuting = true;
  127. lastUpdatedTime = 0;
  128. node: Progress.Node;
  129. info: ProgressInfo;
  130. // used for waiting for cancelled computation trees
  131. onChildrenFinished?: () => void = void 0;
  132. private checkAborted() {
  133. if (this.info.abortToken.abortRequested) {
  134. abort(this.info, this.node);
  135. }
  136. }
  137. get shouldUpdate(): boolean {
  138. this.checkAborted();
  139. return now() - this.lastUpdatedTime > this.info.updateRateMs;
  140. }
  141. private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) {
  142. this.checkAborted();
  143. if (!update) return;
  144. const progress = this.node.progress;
  145. if (typeof update === 'string') {
  146. progress.message = update;
  147. progress.isIndeterminate = true;
  148. } else {
  149. if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort;
  150. if (typeof update.message !== 'undefined') progress.message = update.message;
  151. if (typeof update.current !== 'undefined') progress.current = update.current;
  152. if (typeof update.max !== 'undefined') progress.max = update.max;
  153. progress.isIndeterminate = typeof progress.current === 'undefined' || typeof progress.max === 'undefined';
  154. if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate;
  155. }
  156. }
  157. update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void {
  158. // The progress tracking and observer notification are separated
  159. // because the computation can have a tree structure.
  160. // All nodes of the tree should be regualarly updated at the specified frequency,
  161. // however, the notification should only be invoked once per the whole tree.
  162. this.lastUpdatedTime = now();
  163. this.updateProgress(progress);
  164. if (!!dontNotify /*|| !shouldNotify(this.info, this.lastUpdatedTime)*/) return;
  165. notifyObserver(this.info, this.lastUpdatedTime);
  166. // The computation could have been aborted during the notifycation phase.
  167. this.checkAborted();
  168. return Scheduler.immediatePromise();
  169. }
  170. async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> {
  171. this.updateProgress(progress);
  172. // Create a new child context and add it to the progress tree.
  173. // When the child task finishes, remove the tree node.
  174. const node: Progress.Node = { progress: defaultProgress(task), children: [] };
  175. const children = this.node.children as Progress.Node[];
  176. children.push(node);
  177. const ctx = new ObservableRuntimeContext(this.info, node);
  178. try {
  179. return await execute(task as ExposedTask<T>, ctx);
  180. } catch (e) {
  181. if (Task.isAbort(e)) {
  182. // need to catch the error here because otherwise
  183. // promises for running child tasks in a tree-like computation
  184. // will get orphaned and cause "uncaught error in Promise".
  185. return void 0 as any;
  186. }
  187. throw e;
  188. } finally {
  189. // remove the progress node after the computation has finished.
  190. const idx = children.indexOf(node);
  191. if (idx >= 0) {
  192. for (let i = idx, _i = children.length - 1; i < _i; i++) {
  193. children[i] = children[i + 1];
  194. }
  195. children.pop();
  196. }
  197. if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished();
  198. }
  199. }
  200. constructor(info: ProgressInfo, node: Progress.Node) {
  201. this.node = node;
  202. this.info = info;
  203. }
  204. }