computation.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /**
  2. * Copyright (c) 2017 molio contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * Adapted from https://github.com/dsehnal/LiteMol
  5. * @author David Sehnal <david.sehnal@gmail.com>
  6. */
  7. import Scheduler from './utils/scheduler'
  8. import timeNow from './utils/time'
  9. interface Computation<A> {
  10. (ctx?: Computation.Context): Promise<A>
  11. }
  12. namespace Computation {
  13. export let PRINT_ERRORS_TO_CONSOLE = false;
  14. export function create<A>(computation: (ctx: Context) => Promise<A>) {
  15. return ComputationImpl(computation);
  16. }
  17. export function resolve<A>(a: A) {
  18. return create<A>(_ => Promise.resolve(a));
  19. }
  20. export function reject<A>(reason: any) {
  21. return create<A>(_ => Promise.reject(reason));
  22. }
  23. export interface Params {
  24. updateRateMs?: number,
  25. observer?: ProgressObserver
  26. }
  27. export const Aborted = 'Aborted';
  28. export interface Progress {
  29. message: string,
  30. isIndeterminate: boolean,
  31. current: number,
  32. max: number,
  33. elapsedMs: number,
  34. requestAbort?: () => void
  35. }
  36. export interface ProgressUpdate {
  37. message?: string,
  38. abort?: boolean | (() => void),
  39. current?: number,
  40. max?: number
  41. }
  42. export interface Context {
  43. readonly isSynchronous: boolean,
  44. /** Also checks if the computation was aborted. If so, throws. */
  45. readonly requiresUpdate: boolean,
  46. requestAbort(): void,
  47. subscribe(onProgress: ProgressObserver): { dispose: () => void },
  48. /** Also checks if the computation was aborted. If so, throws. */
  49. update(info: ProgressUpdate): Promise<void> | void
  50. }
  51. export type ProgressObserver = (progress: Readonly<Progress>) => void;
  52. const emptyDisposer = { dispose: () => { } }
  53. /** A context without updates. */
  54. export const synchronous: Context = {
  55. isSynchronous: true,
  56. requiresUpdate: false,
  57. requestAbort() { },
  58. subscribe(onProgress) { return emptyDisposer; },
  59. update(info) { }
  60. }
  61. export function observable(params?: Partial<Params>) {
  62. const ret = new ObservableContext(params && params.updateRateMs);
  63. if (params && params.observer) ret.subscribe(params.observer);
  64. return ret;
  65. }
  66. export const now = timeNow;
  67. /** A utility for splitting large computations into smaller parts. */
  68. export interface Chunker {
  69. setNextChunkSize(size: number): void,
  70. /** nextChunk must return the number of actually processed chunks. */
  71. process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
  72. }
  73. export function chunker(ctx: Context, nextChunkSize: number): Chunker {
  74. return new ChunkerImpl(ctx, nextChunkSize);
  75. }
  76. }
  77. const DefaulUpdateRateMs = 150;
  78. function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
  79. return (ctx?: Computation.Context) => {
  80. const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
  81. return new Promise<A>(async (resolve, reject) => {
  82. try {
  83. if (context.started) context.started();
  84. const result = await computation(context);
  85. resolve(result);
  86. } catch (e) {
  87. if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
  88. reject(e);
  89. } finally {
  90. if (context.finished) context.finished();
  91. }
  92. });
  93. }
  94. }
  95. class ObservableContext implements Computation.Context {
  96. readonly updateRate: number;
  97. readonly isSynchronous: boolean = false;
  98. private level = 0;
  99. private startedTime = 0;
  100. private abortRequested = false;
  101. private lastUpdated = 0;
  102. private observers: Computation.ProgressObserver[] | undefined = void 0;
  103. private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
  104. lastDelta = 0;
  105. private checkAborted() {
  106. if (this.abortRequested) throw Computation.Aborted;
  107. }
  108. private abortRequester = () => { this.abortRequested = true };
  109. subscribe = (obs: Computation.ProgressObserver) => {
  110. if (!this.observers) this.observers = [];
  111. this.observers.push(obs);
  112. return {
  113. dispose: () => {
  114. if (!this.observers) return;
  115. for (let i = 0; i < this.observers.length; i++) {
  116. if (this.observers[i] === obs) {
  117. this.observers[i] = this.observers[this.observers.length - 1];
  118. this.observers.pop();
  119. return;
  120. }
  121. }
  122. }
  123. };
  124. }
  125. requestAbort() {
  126. try {
  127. if (this.abortRequester) {
  128. this.abortRequester.call(null);
  129. }
  130. } catch (e) { }
  131. }
  132. update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
  133. this.checkAborted();
  134. const time = Computation.now();
  135. if (typeof abort === 'boolean') {
  136. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  137. } else {
  138. if (abort) this.abortRequester = abort;
  139. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  140. }
  141. if (typeof message !== 'undefined') this.progress.message = message;
  142. this.progress.elapsedMs = time - this.startedTime;
  143. if (isNaN(current!)) {
  144. this.progress.isIndeterminate = true;
  145. } else {
  146. this.progress.isIndeterminate = false;
  147. this.progress.current = current!;
  148. if (!isNaN(max!)) this.progress.max = max!;
  149. }
  150. if (this.observers) {
  151. const p = { ...this.progress };
  152. for (const o of this.observers) Scheduler.immediate(o, p);
  153. }
  154. this.lastDelta = time - this.lastUpdated;
  155. this.lastUpdated = time;
  156. return Scheduler.immediatePromise();
  157. }
  158. get requiresUpdate() {
  159. this.checkAborted();
  160. if (this.isSynchronous) return false;
  161. return Computation.now() - this.lastUpdated > this.updateRate;
  162. }
  163. started() {
  164. if (!this.level) {
  165. this.startedTime = Computation.now();
  166. this.lastUpdated = this.startedTime;
  167. }
  168. this.level++;
  169. }
  170. finished() {
  171. this.level--;
  172. if (this.level < 0) {
  173. throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
  174. }
  175. if (!this.level) this.observers = void 0;
  176. }
  177. constructor(updateRate?: number) {
  178. this.updateRate = updateRate || DefaulUpdateRateMs;
  179. }
  180. }
  181. class ChunkerImpl implements Computation.Chunker {
  182. private processedSinceUpdate = 0;
  183. private updater: Computation.Context['update'];
  184. private computeChunkSize() {
  185. const lastDelta = (this.context as ObservableContext).lastDelta || 0;
  186. if (!lastDelta) return this.nextChunkSize;
  187. const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
  188. const ret = Math.round(this.processedSinceUpdate * rate / lastDelta + 1);
  189. this.processedSinceUpdate = 0;
  190. return ret;
  191. }
  192. private getNextChunkSize() {
  193. const ctx = this.context as ObservableContext;
  194. // be smart if the computation is synchronous and process the whole chunk at once.
  195. if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
  196. return this.nextChunkSize;
  197. }
  198. setNextChunkSize(size: number) {
  199. this.nextChunkSize = size;
  200. }
  201. async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
  202. if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
  203. let lastChunkSize: number;
  204. while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
  205. this.processedSinceUpdate += lastChunkSize;
  206. if (this.context.requiresUpdate) {
  207. await update(this.updater);
  208. this.nextChunkSize = this.computeChunkSize();
  209. }
  210. }
  211. if (this.context.requiresUpdate) {
  212. await update(this.updater);
  213. this.nextChunkSize = this.computeChunkSize();
  214. }
  215. }
  216. constructor(public context: Computation.Context, private nextChunkSize: number) {
  217. this.updater = this.context.update.bind(this.context);
  218. }
  219. }
  220. export default Computation;