123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- /**
- * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
- *
- * Adapted from https://github.com/dsehnal/LiteMol
- * @author David Sehnal <david.sehnal@gmail.com>
- */
- import Scheduler from './scheduler'
- import timeNow from './util/now'
- interface Computation<A> {
- (ctx?: Computation.Context): Promise<A>
- }
- namespace Computation {
- export let PRINT_ERRORS_TO_CONSOLE = false;
- export function create<A>(computation: (ctx: Context) => Promise<A>) {
- return ComputationImpl(computation);
- }
- export function resolve<A>(a: A) {
- return create<A>(_ => Promise.resolve(a));
- }
- export function reject<A>(reason: any) {
- return create<A>(_ => Promise.reject(reason));
- }
- export interface Params {
- updateRateMs?: number,
- observer?: ProgressObserver
- }
- export const Aborted = 'Aborted';
- export interface Progress {
- message: string,
- isIndeterminate: boolean,
- current: number,
- max: number,
- elapsedMs: number,
- requestAbort?: () => void
- }
- export interface ProgressUpdate {
- message?: string,
- abort?: boolean | (() => void),
- current?: number,
- max?: number
- }
- export interface Context {
- readonly isSynchronous: boolean,
- /** Also checks if the computation was aborted. If so, throws. */
- readonly requiresUpdate: boolean,
- requestAbort(): void,
- subscribe(onProgress: ProgressObserver): { dispose: () => void },
- /** Also checks if the computation was aborted. If so, throws. */
- update(info: ProgressUpdate): Promise<void> | void
- }
- export type ProgressObserver = (progress: Readonly<Progress>) => void;
- const emptyDisposer = { dispose: () => { } }
- /** A context without updates. */
- export const synchronous: Context = {
- isSynchronous: true,
- requiresUpdate: false,
- requestAbort() { },
- subscribe(onProgress) { return emptyDisposer; },
- update(info) { }
- }
- export function observable(params?: Partial<Params>) {
- const ret = new ObservableContext(params && params.updateRateMs);
- if (params && params.observer) ret.subscribe(params.observer);
- return ret;
- }
- export const now = timeNow;
- /** A utility for splitting large computations into smaller parts. */
- export interface Chunker {
- setNextChunkSize(size: number): void,
- /** nextChunk must return the number of actually processed chunks. */
- process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
- }
- export function chunker(ctx: Context, nextChunkSize: number): Chunker {
- return new ChunkerImpl(ctx, nextChunkSize);
- }
- }
- const DefaulUpdateRateMs = 150;
- function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
- return (ctx?: Computation.Context) => {
- const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
- return new Promise<A>(async (resolve, reject) => {
- try {
- if (context.started) context.started();
- const result = await computation(context);
- resolve(result);
- } catch (e) {
- if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
- reject(e);
- } finally {
- if (context.finished) context.finished();
- }
- });
- }
- }
- class ObservableContext implements Computation.Context {
- readonly updateRate: number;
- readonly isSynchronous: boolean = false;
- private level = 0;
- private startedTime = 0;
- private abortRequested = false;
- private lastUpdated = 0;
- private observers: Computation.ProgressObserver[] | undefined = void 0;
- private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
- private checkAborted() {
- if (this.abortRequested) throw Computation.Aborted;
- }
- private abortRequester = () => { this.abortRequested = true };
- subscribe = (obs: Computation.ProgressObserver) => {
- if (!this.observers) this.observers = [];
- this.observers.push(obs);
- return {
- dispose: () => {
- if (!this.observers) return;
- for (let i = 0; i < this.observers.length; i++) {
- if (this.observers[i] === obs) {
- this.observers[i] = this.observers[this.observers.length - 1];
- this.observers.pop();
- return;
- }
- }
- }
- };
- }
- requestAbort() {
- try {
- if (this.abortRequester) {
- this.abortRequester.call(null);
- }
- } catch (e) { }
- }
- update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
- this.checkAborted();
- const time = Computation.now();
- if (typeof abort === 'boolean') {
- this.progress.requestAbort = abort ? this.abortRequester : void 0;
- } else {
- if (abort) this.abortRequester = abort;
- this.progress.requestAbort = abort ? this.abortRequester : void 0;
- }
- if (typeof message !== 'undefined') this.progress.message = message;
- this.progress.elapsedMs = time - this.startedTime;
- if (isNaN(current!)) {
- this.progress.isIndeterminate = true;
- } else {
- this.progress.isIndeterminate = false;
- this.progress.current = current!;
- if (!isNaN(max!)) this.progress.max = max!;
- }
- if (this.observers) {
- const p = { ...this.progress };
- for (let i = 0, _i = this.observers.length; i < _i; i++) {
- Scheduler.immediate(this.observers[i], p);
- }
- }
- this.lastUpdated = time;
- return Scheduler.immediatePromise();
- }
- get requiresUpdate() {
- this.checkAborted();
- if (this.isSynchronous) return false;
- return Computation.now() - this.lastUpdated > this.updateRate;
- }
- started() {
- if (!this.level) {
- this.startedTime = Computation.now();
- this.lastUpdated = this.startedTime;
- }
- this.level++;
- }
- finished() {
- this.level--;
- if (this.level < 0) {
- throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
- }
- if (!this.level) this.observers = void 0;
- }
- constructor(updateRate?: number) {
- this.updateRate = updateRate || DefaulUpdateRateMs;
- }
- }
- class ChunkerImpl implements Computation.Chunker {
- private processedSinceUpdate = 0;
- private updater: Computation.Context['update'];
- private computeChunkSize(delta: number) {
- if (!delta) {
- this.processedSinceUpdate = 0;
- return this.nextChunkSize;
- }
- const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
- const ret = Math.round(this.processedSinceUpdate * rate / delta + 1);
- this.processedSinceUpdate = 0;
- return ret;
- }
- private getNextChunkSize() {
- const ctx = this.context as ObservableContext;
- // be smart if the computation is synchronous and process the whole chunk at once.
- if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
- return this.nextChunkSize;
- }
- setNextChunkSize(size: number) {
- this.nextChunkSize = size;
- }
- async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
- if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
- this.processedSinceUpdate = 0;
- // track time for the actual computation and exclude the "update time"
- let chunkStart = Computation.now();
- let lastChunkSize: number;
- let chunkCount = 0;
- let totalSize = 0;
- let updateCount = 0;
- while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
- chunkCount++;
- this.processedSinceUpdate += lastChunkSize;
- totalSize += lastChunkSize;
- if (this.context.requiresUpdate) {
- let time = Computation.now();
- await update(this.updater);
- this.nextChunkSize = updateCount > 0
- ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
- : this.computeChunkSize(time - chunkStart)
- updateCount++;
- chunkStart = Computation.now();
- }
- }
- if (this.context.requiresUpdate) {
- let time = Computation.now();
- await update(this.updater);
- this.nextChunkSize = updateCount > 0
- ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
- : this.computeChunkSize(time - chunkStart)
- }
- }
- constructor(public context: Computation.Context, private nextChunkSize: number) {
- this.updater = this.context.update.bind(this.context);
- }
- }
- export default Computation;
|