observable.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /**
  2. * Copyright (c) 2017-2018 mol* contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * @author David Sehnal <david.sehnal@gmail.com>
  5. */
  6. import { Task } from '../task'
  7. import { RuntimeContext } from './runtime-context'
  8. import { Progress } from './progress'
  9. import { now } from '../../mol-util/now';
  10. import { Scheduler } from '../util/scheduler'
  11. import { UserTiming } from '../util/user-timing'
  12. import { isDebugMode } from '../../mol-util/debug'
  13. interface ExposedTask<T> extends Task<T> {
  14. f: (ctx: RuntimeContext) => Promise<T>,
  15. onAbort?: () => void
  16. }
  17. export function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
  18. const info = ProgressInfo(task, observer, updateRateMs);
  19. const ctx = new ObservableRuntimeContext(info, info.root);
  20. return execute(task as ExposedTask<T>, ctx);
  21. }
  22. export function CreateObservableCtx<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
  23. const info = ProgressInfo(task, observer, updateRateMs);
  24. return new ObservableRuntimeContext(info, info.root);
  25. }
  26. export function ExecuteInContext<T>(ctx: RuntimeContext, task: Task<T>) {
  27. return execute(task as ExposedTask<T>, ctx as ObservableRuntimeContext);
  28. }
  29. export function ExecuteObservableChild<T>(ctx: RuntimeContext, task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>) {
  30. return (ctx as ObservableRuntimeContext).runChild(task as ExposedTask<T>, progress);
  31. }
  32. function defaultProgress(task: Task<any>): Task.Progress {
  33. return {
  34. taskId: task.id,
  35. taskName: task.name,
  36. message: '',
  37. startedTime: 0,
  38. canAbort: true,
  39. isIndeterminate: true,
  40. current: 0,
  41. max: 0
  42. };
  43. }
  44. interface ProgressInfo {
  45. updateRateMs: number,
  46. lastNotified: number,
  47. observer: Progress.Observer,
  48. abortToken: { abortRequested: boolean, treeAborted: boolean, reason: string },
  49. taskId: number;
  50. root: Progress.Node;
  51. tryAbort: (reason?: string) => void;
  52. }
  53. function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs: number): ProgressInfo {
  54. const abortToken: ProgressInfo['abortToken'] = { abortRequested: false, treeAborted: false, reason: '' };
  55. return {
  56. updateRateMs,
  57. lastNotified: now(),
  58. observer,
  59. abortToken,
  60. taskId: task.id,
  61. root: { progress: defaultProgress(task), children: [] },
  62. tryAbort: createAbortFunction(abortToken)
  63. };
  64. }
  65. function createAbortFunction(token: ProgressInfo['abortToken']) {
  66. return (reason?: string) => {
  67. token.abortRequested = true;
  68. token.reason = reason || token.reason;
  69. };
  70. }
  71. function cloneTree(root: Progress.Node): Progress.Node {
  72. return { progress: { ...root.progress }, children: root.children.map(cloneTree) };
  73. }
  74. function canAbort(root: Progress.Node): boolean {
  75. return root.progress.canAbort && root.children.every(canAbort);
  76. }
  77. function snapshotProgress(info: ProgressInfo): Progress {
  78. return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort };
  79. }
  80. async function execute<T>(task: ExposedTask<T>, ctx: ObservableRuntimeContext) {
  81. UserTiming.markStart(task)
  82. ctx.node.progress.startedTime = now();
  83. try {
  84. const ret = await task.f(ctx);
  85. UserTiming.markEnd(task)
  86. UserTiming.measure(task)
  87. if (ctx.info.abortToken.abortRequested) {
  88. abort(ctx.info);
  89. }
  90. return ret;
  91. } catch (e) {
  92. if (Task.isAbort(e)) {
  93. ctx.isAborted = true;
  94. // wait for all child computations to go thru the abort phase.
  95. if (ctx.node.children.length > 0) {
  96. await new Promise(res => { ctx.onChildrenFinished = res; });
  97. }
  98. if (task.onAbort) {
  99. task.onAbort();
  100. }
  101. }
  102. if (isDebugMode) console.error(e);
  103. throw e;
  104. }
  105. }
  106. function abort(info: ProgressInfo) {
  107. if (!info.abortToken.treeAborted) {
  108. info.abortToken.treeAborted = true;
  109. abortTree(info.root);
  110. notifyObserver(info, now());
  111. }
  112. throw Task.Aborted(info.abortToken.reason);
  113. }
  114. function abortTree(root: Progress.Node) {
  115. const progress = root.progress;
  116. progress.isIndeterminate = true;
  117. progress.canAbort = false;
  118. progress.message = 'Aborting...';
  119. for (const c of root.children) abortTree(c);
  120. }
  121. // function shouldNotify(info: ProgressInfo, time: number) {
  122. // return time - info.lastNotified > info.updateRateMs;
  123. // }
  124. function notifyObserver(info: ProgressInfo, time: number) {
  125. info.lastNotified = time;
  126. const snapshot = snapshotProgress(info);
  127. info.observer(snapshot);
  128. }
  129. class ObservableRuntimeContext implements RuntimeContext {
  130. isSynchronous = false;
  131. isExecuting = true;
  132. lastUpdatedTime = 0;
  133. isAborted?: boolean;
  134. node: Progress.Node;
  135. info: ProgressInfo;
  136. // used for waiting for cancelled computation trees
  137. onChildrenFinished?: () => void = void 0;
  138. private checkAborted() {
  139. if (this.info.abortToken.abortRequested) {
  140. this.isAborted = true;
  141. abort(this.info);
  142. }
  143. }
  144. get shouldUpdate(): boolean {
  145. this.checkAborted();
  146. return now() - this.lastUpdatedTime > this.info.updateRateMs;
  147. }
  148. private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) {
  149. this.checkAborted();
  150. if (!update) return;
  151. const progress = this.node.progress;
  152. if (typeof update === 'string') {
  153. progress.message = update;
  154. progress.isIndeterminate = true;
  155. } else {
  156. if (typeof update.canAbort !== 'undefined') progress.canAbort = update.canAbort;
  157. if (typeof update.message !== 'undefined') progress.message = update.message;
  158. if (typeof update.current !== 'undefined') progress.current = update.current;
  159. if (typeof update.max !== 'undefined') progress.max = update.max;
  160. progress.isIndeterminate = typeof progress.current === 'undefined' || typeof progress.max === 'undefined';
  161. if (typeof update.isIndeterminate !== 'undefined') progress.isIndeterminate = update.isIndeterminate;
  162. }
  163. }
  164. update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void {
  165. // The progress tracking and observer notification are separated
  166. // because the computation can have a tree structure.
  167. // All nodes of the tree should be regualarly updated at the specified frequency,
  168. // however, the notification should only be invoked once per the whole tree.
  169. this.lastUpdatedTime = now();
  170. this.updateProgress(progress);
  171. // TODO: do the shouldNotify check here?
  172. if (!!dontNotify /*|| !shouldNotify(this.info, this.lastUpdatedTime)*/) return;
  173. notifyObserver(this.info, this.lastUpdatedTime);
  174. // The computation could have been aborted during the notifycation phase.
  175. this.checkAborted();
  176. return Scheduler.immediatePromise();
  177. }
  178. async runChild<T>(task: ExposedTask<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> {
  179. this.updateProgress(progress);
  180. // Create a new child context and add it to the progress tree.
  181. // When the child task finishes, remove the tree node.
  182. const node: Progress.Node = { progress: defaultProgress(task), children: [] };
  183. const children = this.node.children as Progress.Node[];
  184. children.push(node);
  185. const ctx = new ObservableRuntimeContext(this.info, node);
  186. try {
  187. return await execute(task as ExposedTask<T>, ctx);
  188. } catch (e) {
  189. if (Task.isAbort(e)) {
  190. // need to catch the error here because otherwise
  191. // promises for running child tasks in a tree-like computation
  192. // will get orphaned and cause "uncaught error in Promise".
  193. if (this.isAborted) return void 0 as any;
  194. }
  195. throw e;
  196. } finally {
  197. // remove the progress node after the computation has finished.
  198. const idx = children.indexOf(node);
  199. if (idx >= 0) {
  200. for (let i = idx, _i = children.length - 1; i < _i; i++) {
  201. children[i] = children[i + 1];
  202. }
  203. children.pop();
  204. }
  205. if (children.length === 0 && this.onChildrenFinished) this.onChildrenFinished();
  206. }
  207. }
  208. constructor(info: ProgressInfo, node: Progress.Node) {
  209. this.node = node;
  210. this.info = info;
  211. }
  212. }