computation.ts 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /**
  2. * Copyright (c) 2017 molio contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * Adapted from https://github.com/dsehnal/LiteMol
  5. * @author David Sehnal <david.sehnal@gmail.com>
  6. */
  7. import Scheduler from './scheduler'
  8. interface Computation<A> {
  9. (ctx?: Computation.Context): Promise<A>
  10. }
  11. namespace Computation {
  12. export let PRINT_ERRORS_TO_CONSOLE = false;
  13. export function create<A>(computation: (ctx: Context) => Promise<A>) {
  14. return ComputationImpl(computation);
  15. }
  16. export function resolve<A>(a: A) {
  17. return create<A>(_ => Promise.resolve(a));
  18. }
  19. export function reject<A>(reason: any) {
  20. return create<A>(_ => Promise.reject(reason));
  21. }
  22. export interface Params {
  23. updateRateMs?: number,
  24. observer?: ProgressObserver
  25. }
  26. export const Aborted = 'Aborted';
  27. export interface Progress {
  28. message: string,
  29. isIndeterminate: boolean,
  30. current: number,
  31. max: number,
  32. elapsedMs: number,
  33. requestAbort?: () => void
  34. }
  35. export interface ProgressUpdate {
  36. message?: string,
  37. abort?: boolean | (() => void),
  38. current?: number,
  39. max?: number
  40. }
  41. export interface Context {
  42. readonly isSynchronous: boolean,
  43. /** Also checks if the computation was aborted. If so, throws. */
  44. readonly requiresUpdate: boolean,
  45. requestAbort(): void,
  46. subscribe(onProgress: ProgressObserver): { dispose: () => void },
  47. /** Also checks if the computation was aborted. If so, throws. */
  48. update(info: ProgressUpdate): Promise<void> | void
  49. }
  50. export type ProgressObserver = (progress: Readonly<Progress>) => void;
  51. const emptyDisposer = { dispose: () => { } }
  52. /** A context without updates. */
  53. export const synchronous: Context = {
  54. isSynchronous: true,
  55. requiresUpdate: false,
  56. requestAbort() { },
  57. subscribe(onProgress) { return emptyDisposer; },
  58. update(info) { }
  59. }
  60. export function observable(params?: Partial<Params>) {
  61. const ret = new ObservableContext(params && params.updateRateMs);
  62. if (params && params.observer) ret.subscribe(params.observer);
  63. return ret;
  64. }
  65. declare var process: any;
  66. declare var window: any;
  67. export const now: () => number = (function () {
  68. if (typeof window !== 'undefined' && window.performance) {
  69. const perf = window.performance;
  70. return () => perf.now();
  71. } else if (typeof process !== 'undefined' && process.hrtime !== 'undefined') {
  72. return () => {
  73. let t = process.hrtime();
  74. return t[0] * 1000 + t[1] / 1000000;
  75. };
  76. } else {
  77. return () => +new Date();
  78. }
  79. }());
  80. /** A utility for splitting large computations into smaller parts. */
  81. export interface Chunker {
  82. setNextChunkSize(size: number): void,
  83. /** nextChunk must return the number of actually processed chunks. */
  84. process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
  85. }
  86. export function chunker(ctx: Context, nextChunkSize: number): Chunker {
  87. return new ChunkerImpl(ctx, nextChunkSize);
  88. }
  89. }
  90. const DefaulUpdateRateMs = 150;
  91. function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
  92. return (ctx?: Computation.Context) => {
  93. const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
  94. return new Promise<A>(async (resolve, reject) => {
  95. try {
  96. if (context.started) context.started();
  97. const result = await computation(context);
  98. resolve(result);
  99. } catch (e) {
  100. if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
  101. reject(e);
  102. } finally {
  103. if (context.finished) context.finished();
  104. }
  105. });
  106. }
  107. }
  108. class ObservableContext implements Computation.Context {
  109. readonly updateRate: number;
  110. readonly isSynchronous: boolean = false;
  111. private level = 0;
  112. private startedTime = 0;
  113. private abortRequested = false;
  114. private lastUpdated = 0;
  115. private observers: Computation.ProgressObserver[] | undefined = void 0;
  116. private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
  117. lastDelta = 0;
  118. private checkAborted() {
  119. if (this.abortRequested) throw Computation.Aborted;
  120. }
  121. private abortRequester = () => { this.abortRequested = true };
  122. subscribe = (obs: Computation.ProgressObserver) => {
  123. if (!this.observers) this.observers = [];
  124. this.observers.push(obs);
  125. return {
  126. dispose: () => {
  127. if (!this.observers) return;
  128. for (let i = 0; i < this.observers.length; i++) {
  129. if (this.observers[i] === obs) {
  130. this.observers[i] = this.observers[this.observers.length - 1];
  131. this.observers.pop();
  132. return;
  133. }
  134. }
  135. }
  136. };
  137. }
  138. requestAbort() {
  139. try {
  140. if (this.abortRequester) {
  141. this.abortRequester.call(null);
  142. }
  143. } catch (e) { }
  144. }
  145. update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
  146. this.checkAborted();
  147. const time = Computation.now();
  148. if (typeof abort === 'boolean') {
  149. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  150. } else {
  151. if (abort) this.abortRequester = abort;
  152. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  153. }
  154. if (typeof message !== 'undefined') this.progress.message = message;
  155. this.progress.elapsedMs = time - this.startedTime;
  156. if (isNaN(current!)) {
  157. this.progress.isIndeterminate = true;
  158. } else {
  159. this.progress.isIndeterminate = false;
  160. this.progress.current = current!;
  161. if (!isNaN(max!)) this.progress.max = max!;
  162. }
  163. if (this.observers) {
  164. const p = { ...this.progress };
  165. for (const o of this.observers) Scheduler.immediate(o, p);
  166. }
  167. this.lastDelta = time - this.lastUpdated;
  168. this.lastUpdated = time;
  169. return Scheduler.immediatePromise();
  170. }
  171. get requiresUpdate() {
  172. this.checkAborted();
  173. if (this.isSynchronous) return false;
  174. return Computation.now() - this.lastUpdated > this.updateRate;
  175. }
  176. started() {
  177. if (!this.level) {
  178. this.startedTime = Computation.now();
  179. this.lastUpdated = this.startedTime;
  180. }
  181. this.level++;
  182. }
  183. finished() {
  184. this.level--;
  185. if (this.level < 0) {
  186. throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
  187. }
  188. if (!this.level) this.observers = void 0;
  189. }
  190. constructor(updateRate?: number) {
  191. this.updateRate = updateRate || DefaulUpdateRateMs;
  192. }
  193. }
  194. class ChunkerImpl implements Computation.Chunker {
  195. private processedSinceUpdate = 0;
  196. private updater: Computation.Context['update'];
  197. private computeChunkSize() {
  198. const lastDelta = (this.context as ObservableContext).lastDelta || 0;
  199. if (!lastDelta) return this.nextChunkSize;
  200. const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
  201. const ret = Math.round(this.processedSinceUpdate * rate / lastDelta + 1);
  202. this.processedSinceUpdate = 0;
  203. return ret;
  204. }
  205. private getNextChunkSize() {
  206. const ctx = this.context as ObservableContext;
  207. // be smart if the computation is synchronous and process the whole chunk at once.
  208. if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
  209. return this.nextChunkSize;
  210. }
  211. setNextChunkSize(size: number) {
  212. this.nextChunkSize = size;
  213. }
  214. async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
  215. if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
  216. let lastChunkSize: number;
  217. while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
  218. this.processedSinceUpdate += lastChunkSize;
  219. if (this.context.requiresUpdate) {
  220. await update(this.updater);
  221. this.nextChunkSize = this.computeChunkSize();
  222. }
  223. }
  224. if (this.context.requiresUpdate) {
  225. await update(this.updater);
  226. this.nextChunkSize = this.computeChunkSize();
  227. }
  228. }
  229. constructor(public context: Computation.Context, private nextChunkSize: number) {
  230. this.updater = this.context.update.bind(this.context);
  231. }
  232. }
  233. export default Computation;