/** * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info. * * Adapted from https://github.com/dsehnal/LiteMol * @author David Sehnal */ import Scheduler from './scheduler' import timeNow from './util/now' interface Computation { (ctx?: Computation.Context): Promise } namespace Computation { export let PRINT_ERRORS_TO_CONSOLE = false; export function create(computation: (ctx: Context) => Promise) { return ComputationImpl(computation); } export function resolve(a: A) { return create(_ => Promise.resolve(a)); } export function reject(reason: any) { return create(_ => Promise.reject(reason)); } export interface Params { updateRateMs?: number, observer?: ProgressObserver } export const Aborted = 'Aborted'; export interface Progress { message: string, isIndeterminate: boolean, current: number, max: number, elapsedMs: number, requestAbort?: () => void } export interface ProgressUpdate { message?: string, abort?: boolean | (() => void), current?: number, max?: number } export interface Context { readonly isSynchronous: boolean, /** Also checks if the computation was aborted. If so, throws. */ readonly requiresUpdate: boolean, requestAbort(): void, subscribe(onProgress: ProgressObserver): { dispose: () => void }, /** Also checks if the computation was aborted. If so, throws. */ update(info: ProgressUpdate): Promise | void } export type ProgressObserver = (progress: Readonly) => void; const emptyDisposer = { dispose: () => { } } /** A context without updates. */ export const synchronous: Context = { isSynchronous: true, requiresUpdate: false, requestAbort() { }, subscribe(onProgress) { return emptyDisposer; }, update(info) { } } export function observable(params?: Partial) { const ret = new ObservableContext(params && params.updateRateMs); if (params && params.observer) ret.subscribe(params.observer); return ret; } export const now = timeNow; /** A utility for splitting large computations into smaller parts. */ export interface Chunker { setNextChunkSize(size: number): void, /** nextChunk must return the number of actually processed chunks. */ process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise } export function chunker(ctx: Context, nextChunkSize: number): Chunker { return new ChunkerImpl(ctx, nextChunkSize); } } const DefaulUpdateRateMs = 150; function ComputationImpl(computation: (ctx: Computation.Context) => Promise): Computation { return (ctx?: Computation.Context) => { const context: ObservableContext = ctx ? ctx : Computation.synchronous as any; return new Promise(async (resolve, reject) => { try { if (context.started) context.started(); const result = await computation(context); resolve(result); } catch (e) { if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e); reject(e); } finally { if (context.finished) context.finished(); } }); } } class ObservableContext implements Computation.Context { readonly updateRate: number; readonly isSynchronous: boolean = false; private level = 0; private startedTime = 0; private abortRequested = false; private lastUpdated = 0; private observers: Computation.ProgressObserver[] | undefined = void 0; private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 }; private checkAborted() { if (this.abortRequested) throw Computation.Aborted; } private abortRequester = () => { this.abortRequested = true }; subscribe = (obs: Computation.ProgressObserver) => { if (!this.observers) this.observers = []; this.observers.push(obs); return { dispose: () => { if (!this.observers) return; for (let i = 0; i < this.observers.length; i++) { if (this.observers[i] === obs) { this.observers[i] = this.observers[this.observers.length - 1]; this.observers.pop(); return; } } } }; } requestAbort() { try { if (this.abortRequester) { this.abortRequester.call(null); } } catch (e) { } } update({ message, abort, current, max }: Computation.ProgressUpdate): Promise | void { this.checkAborted(); const time = Computation.now(); if (typeof abort === 'boolean') { this.progress.requestAbort = abort ? this.abortRequester : void 0; } else { if (abort) this.abortRequester = abort; this.progress.requestAbort = abort ? this.abortRequester : void 0; } if (typeof message !== 'undefined') this.progress.message = message; this.progress.elapsedMs = time - this.startedTime; if (isNaN(current!)) { this.progress.isIndeterminate = true; } else { this.progress.isIndeterminate = false; this.progress.current = current!; if (!isNaN(max!)) this.progress.max = max!; } if (this.observers) { const p = { ...this.progress }; for (let i = 0, _i = this.observers.length; i < _i; i++) { Scheduler.immediate(this.observers[i], p); } } this.lastUpdated = time; return Scheduler.immediatePromise(); } get requiresUpdate() { this.checkAborted(); if (this.isSynchronous) return false; return Computation.now() - this.lastUpdated > this.updateRate; } started() { if (!this.level) { this.startedTime = Computation.now(); this.lastUpdated = this.startedTime; } this.level++; } finished() { this.level--; if (this.level < 0) { throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.'); } if (!this.level) this.observers = void 0; } constructor(updateRate?: number) { this.updateRate = updateRate || DefaulUpdateRateMs; } } class ChunkerImpl implements Computation.Chunker { private processedSinceUpdate = 0; private updater: Computation.Context['update']; private computeChunkSize(delta: number) { if (!delta) { this.processedSinceUpdate = 0; return this.nextChunkSize; } const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs; const ret = Math.round(this.processedSinceUpdate * rate / delta + 1); this.processedSinceUpdate = 0; return ret; } private getNextChunkSize() { const ctx = this.context as ObservableContext; // be smart if the computation is synchronous and process the whole chunk at once. if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER; return this.nextChunkSize; } setNextChunkSize(size: number) { this.nextChunkSize = size; } async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise | void, nextChunkSize?: number) { if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize); this.processedSinceUpdate = 0; // track time for the actual computation and exclude the "update time" let chunkStart = Computation.now(); let lastChunkSize: number; let chunkCount = 0; let totalSize = 0; let updateCount = 0; while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) { chunkCount++; this.processedSinceUpdate += lastChunkSize; totalSize += lastChunkSize; if (this.context.requiresUpdate) { let time = Computation.now(); await update(this.updater); this.nextChunkSize = updateCount > 0 ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) : this.computeChunkSize(time - chunkStart) updateCount++; chunkStart = Computation.now(); } } if (this.context.requiresUpdate) { let time = Computation.now(); await update(this.updater); this.nextChunkSize = updateCount > 0 ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1)) : this.computeChunkSize(time - chunkStart) } } constructor(public context: Computation.Context, private nextChunkSize: number) { this.updater = this.context.update.bind(this.context); } } export default Computation;