computation.ts 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /**
  2. * Copyright (c) 2017 mol* contributors, licensed under MIT, See LICENSE file for more info.
  3. *
  4. * Adapted from https://github.com/dsehnal/LiteMol
  5. * @author David Sehnal <david.sehnal@gmail.com>
  6. */
  7. import Scheduler from './scheduler'
  8. import timeNow from './util/now'
  9. interface Computation<A> {
  10. (ctx?: Computation.Context): Promise<A>
  11. }
  12. namespace Computation {
  13. export let PRINT_ERRORS_TO_CONSOLE = false;
  14. export function create<A>(computation: (ctx: Context) => Promise<A>) {
  15. return ComputationImpl(computation);
  16. }
  17. export function resolve<A>(a: A) {
  18. return create<A>(_ => Promise.resolve(a));
  19. }
  20. export function reject<A>(reason: any) {
  21. return create<A>(_ => Promise.reject(reason));
  22. }
  23. export interface Params {
  24. updateRateMs?: number,
  25. observer?: ProgressObserver
  26. }
  27. export const Aborted = 'Aborted';
  28. export interface Progress {
  29. message: string,
  30. isIndeterminate: boolean,
  31. current: number,
  32. max: number,
  33. elapsedMs: number,
  34. requestAbort?: () => void
  35. }
  36. export interface ProgressUpdate {
  37. message?: string,
  38. abort?: boolean | (() => void),
  39. current?: number,
  40. max?: number
  41. }
  42. export interface Context {
  43. readonly isSynchronous: boolean,
  44. /** Also checks if the computation was aborted. If so, throws. */
  45. readonly requiresUpdate: boolean,
  46. requestAbort(): void,
  47. subscribe(onProgress: ProgressObserver): { dispose: () => void },
  48. /** Also checks if the computation was aborted. If so, throws. */
  49. update(info: ProgressUpdate): Promise<void> | void
  50. }
  51. export type ProgressObserver = (progress: Readonly<Progress>) => void;
  52. const emptyDisposer = { dispose: () => { } }
  53. /** A context without updates. */
  54. export const synchronous: Context = {
  55. isSynchronous: true,
  56. requiresUpdate: false,
  57. requestAbort() { },
  58. subscribe(onProgress) { return emptyDisposer; },
  59. update(info) { }
  60. }
  61. export function observable(params?: Partial<Params>) {
  62. const ret = new ObservableContext(params && params.updateRateMs);
  63. if (params && params.observer) ret.subscribe(params.observer);
  64. return ret;
  65. }
  66. export const now = timeNow;
  67. /** A utility for splitting large computations into smaller parts. */
  68. export interface Chunker {
  69. setNextChunkSize(size: number): void,
  70. /** nextChunk must return the number of actually processed chunks. */
  71. process(nextChunk: (chunkSize: number) => number, update: (updater: Context['update']) => void, nextChunkSize?: number): Promise<void>
  72. }
  73. export function chunker(ctx: Context, nextChunkSize: number): Chunker {
  74. return new ChunkerImpl(ctx, nextChunkSize);
  75. }
  76. }
  77. const DefaulUpdateRateMs = 150;
  78. function ComputationImpl<A>(computation: (ctx: Computation.Context) => Promise<A>): Computation<A> {
  79. return (ctx?: Computation.Context) => {
  80. const context: ObservableContext = ctx ? ctx : Computation.synchronous as any;
  81. return new Promise<A>(async (resolve, reject) => {
  82. try {
  83. if (context.started) context.started();
  84. const result = await computation(context);
  85. resolve(result);
  86. } catch (e) {
  87. if (Computation.PRINT_ERRORS_TO_CONSOLE) console.error(e);
  88. reject(e);
  89. } finally {
  90. if (context.finished) context.finished();
  91. }
  92. });
  93. }
  94. }
  95. class ObservableContext implements Computation.Context {
  96. readonly updateRate: number;
  97. readonly isSynchronous: boolean = false;
  98. private level = 0;
  99. private startedTime = 0;
  100. private abortRequested = false;
  101. private lastUpdated = 0;
  102. private observers: Computation.ProgressObserver[] | undefined = void 0;
  103. private progress: Computation.Progress = { message: 'Working...', current: 0, max: 0, elapsedMs: 0, isIndeterminate: true, requestAbort: void 0 };
  104. private checkAborted() {
  105. if (this.abortRequested) throw Computation.Aborted;
  106. }
  107. private abortRequester = () => { this.abortRequested = true };
  108. subscribe = (obs: Computation.ProgressObserver) => {
  109. if (!this.observers) this.observers = [];
  110. this.observers.push(obs);
  111. return {
  112. dispose: () => {
  113. if (!this.observers) return;
  114. for (let i = 0; i < this.observers.length; i++) {
  115. if (this.observers[i] === obs) {
  116. this.observers[i] = this.observers[this.observers.length - 1];
  117. this.observers.pop();
  118. return;
  119. }
  120. }
  121. }
  122. };
  123. }
  124. requestAbort() {
  125. try {
  126. if (this.abortRequester) {
  127. this.abortRequester.call(null);
  128. }
  129. } catch (e) { }
  130. }
  131. update({ message, abort, current, max }: Computation.ProgressUpdate): Promise<void> | void {
  132. this.checkAborted();
  133. const time = Computation.now();
  134. if (typeof abort === 'boolean') {
  135. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  136. } else {
  137. if (abort) this.abortRequester = abort;
  138. this.progress.requestAbort = abort ? this.abortRequester : void 0;
  139. }
  140. if (typeof message !== 'undefined') this.progress.message = message;
  141. this.progress.elapsedMs = time - this.startedTime;
  142. if (isNaN(current!)) {
  143. this.progress.isIndeterminate = true;
  144. } else {
  145. this.progress.isIndeterminate = false;
  146. this.progress.current = current!;
  147. if (!isNaN(max!)) this.progress.max = max!;
  148. }
  149. if (this.observers) {
  150. const p = { ...this.progress };
  151. for (let i = 0, _i = this.observers.length; i < _i; i++) {
  152. Scheduler.immediate(this.observers[i], p);
  153. }
  154. }
  155. this.lastUpdated = time;
  156. return Scheduler.immediatePromise();
  157. }
  158. get requiresUpdate() {
  159. this.checkAborted();
  160. if (this.isSynchronous) return false;
  161. return Computation.now() - this.lastUpdated > this.updateRate;
  162. }
  163. started() {
  164. if (!this.level) {
  165. this.startedTime = Computation.now();
  166. this.lastUpdated = this.startedTime;
  167. }
  168. this.level++;
  169. }
  170. finished() {
  171. this.level--;
  172. if (this.level < 0) {
  173. throw new Error('Bug in code somewhere, Computation.resolve/reject called too many times.');
  174. }
  175. if (!this.level) this.observers = void 0;
  176. }
  177. constructor(updateRate?: number) {
  178. this.updateRate = updateRate || DefaulUpdateRateMs;
  179. }
  180. }
  181. class ChunkerImpl implements Computation.Chunker {
  182. private processedSinceUpdate = 0;
  183. private updater: Computation.Context['update'];
  184. private computeChunkSize(delta: number) {
  185. if (!delta) {
  186. this.processedSinceUpdate = 0;
  187. return this.nextChunkSize;
  188. }
  189. const rate = (this.context as ObservableContext).updateRate || DefaulUpdateRateMs;
  190. const ret = Math.round(this.processedSinceUpdate * rate / delta + 1);
  191. this.processedSinceUpdate = 0;
  192. return ret;
  193. }
  194. private getNextChunkSize() {
  195. const ctx = this.context as ObservableContext;
  196. // be smart if the computation is synchronous and process the whole chunk at once.
  197. if (ctx.isSynchronous) return Number.MAX_SAFE_INTEGER;
  198. return this.nextChunkSize;
  199. }
  200. setNextChunkSize(size: number) {
  201. this.nextChunkSize = size;
  202. }
  203. async process(nextChunk: (size: number) => number, update: (updater: Computation.Context['update']) => Promise<void> | void, nextChunkSize?: number) {
  204. if (typeof nextChunkSize !== 'undefined') this.setNextChunkSize(nextChunkSize);
  205. this.processedSinceUpdate = 0;
  206. // track time for the actual computation and exclude the "update time"
  207. let chunkStart = Computation.now();
  208. let lastChunkSize: number;
  209. let chunkCount = 0;
  210. let totalSize = 0;
  211. let updateCount = 0;
  212. while ((lastChunkSize = nextChunk(this.getNextChunkSize())) > 0) {
  213. chunkCount++;
  214. this.processedSinceUpdate += lastChunkSize;
  215. totalSize += lastChunkSize;
  216. if (this.context.requiresUpdate) {
  217. let time = Computation.now();
  218. await update(this.updater);
  219. this.nextChunkSize = updateCount > 0
  220. ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
  221. : this.computeChunkSize(time - chunkStart)
  222. updateCount++;
  223. chunkStart = Computation.now();
  224. }
  225. }
  226. if (this.context.requiresUpdate) {
  227. let time = Computation.now();
  228. await update(this.updater);
  229. this.nextChunkSize = updateCount > 0
  230. ? Math.round((totalSize + this.computeChunkSize(time - chunkStart)) / (chunkCount + 1))
  231. : this.computeChunkSize(time - chunkStart)
  232. }
  233. }
  234. constructor(public context: Computation.Context, private nextChunkSize: number) {
  235. this.updater = this.context.update.bind(this.context);
  236. }
  237. }
  238. export default Computation;