|
@@ -4,19 +4,28 @@
|
|
* @author David Sehnal <david.sehnal@gmail.com>
|
|
* @author David Sehnal <david.sehnal@gmail.com>
|
|
*/
|
|
*/
|
|
|
|
|
|
-import Task from '../task'
|
|
|
|
-import RuntimeContext from './runtime-context'
|
|
|
|
-import Progress from './progress'
|
|
|
|
-import now from '../util/now'
|
|
|
|
-import ImmediateScheduler from '../scheduler/immediate'
|
|
|
|
|
|
+import { Task } from '../task'
|
|
|
|
+import { RuntimeContext } from './runtime-context'
|
|
|
|
+import { Progress } from './progress'
|
|
|
|
+import { now } from '../util/now'
|
|
|
|
+import { Scheduler } from '../util/scheduler'
|
|
|
|
|
|
-function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress {
|
|
|
|
|
|
+function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
|
|
|
|
+ const info = ProgressInfo(task, observer, updateRateMs);
|
|
|
|
+ const ctx = new ObservableRuntimeContext(info, info.root);
|
|
|
|
+ return runRoot(task, ctx);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+namespace ExecuteObservable {
|
|
|
|
+ export let PRINT_ERRORS_TO_STD_ERR = false;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+function defaultProgress(task: Task<any>): Task.Progress {
|
|
return {
|
|
return {
|
|
- rootTaskId,
|
|
|
|
taskId: task.id,
|
|
taskId: task.id,
|
|
taskName: task.name,
|
|
taskName: task.name,
|
|
- message: 'Running...',
|
|
|
|
- elapsedMs: { real: 0, cpu: 0 },
|
|
|
|
|
|
+ message: '',
|
|
|
|
+ startedTime: 0,
|
|
canAbort: true,
|
|
canAbort: true,
|
|
isIndeterminate: true,
|
|
isIndeterminate: true,
|
|
current: 0,
|
|
current: 0,
|
|
@@ -26,7 +35,7 @@ function defaultProgress(rootTaskId: number, task: Task<any>): Task.Progress {
|
|
|
|
|
|
interface ProgressInfo {
|
|
interface ProgressInfo {
|
|
updateRateMs: number,
|
|
updateRateMs: number,
|
|
- lastUpdated: number,
|
|
|
|
|
|
+ lastNotified: number,
|
|
observer: Progress.Observer,
|
|
observer: Progress.Observer,
|
|
|
|
|
|
abortToken: { abortRequested: boolean, reason: string },
|
|
abortToken: { abortRequested: boolean, reason: string },
|
|
@@ -40,16 +49,16 @@ function ProgressInfo(task: Task<any>, observer: Progress.Observer, updateRateMs
|
|
|
|
|
|
return {
|
|
return {
|
|
updateRateMs,
|
|
updateRateMs,
|
|
- lastUpdated: now(),
|
|
|
|
|
|
+ lastNotified: now(),
|
|
observer,
|
|
observer,
|
|
abortToken,
|
|
abortToken,
|
|
taskId: task.id,
|
|
taskId: task.id,
|
|
- root: { progress: defaultProgress(task.id, task), children: [] },
|
|
|
|
- tryAbort: abortFn(abortToken)
|
|
|
|
|
|
+ root: { progress: defaultProgress(task), children: [] },
|
|
|
|
+ tryAbort: createAbortFunction(abortToken)
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
-function abortFn(token: ProgressInfo['abortToken']) {
|
|
|
|
|
|
+function createAbortFunction(token: ProgressInfo['abortToken']) {
|
|
return (reason?: string) => {
|
|
return (reason?: string) => {
|
|
token.abortRequested = true;
|
|
token.abortRequested = true;
|
|
token.reason = reason || token.reason;
|
|
token.reason = reason || token.reason;
|
|
@@ -57,7 +66,7 @@ function abortFn(token: ProgressInfo['abortToken']) {
|
|
}
|
|
}
|
|
|
|
|
|
function cloneTree(root: Progress.Node): Progress.Node {
|
|
function cloneTree(root: Progress.Node): Progress.Node {
|
|
- return { progress: { ...root.progress, elapsedMs: { ...root.progress.elapsedMs } }, children: root.children.map(cloneTree) };
|
|
|
|
|
|
+ return { progress: { ...root.progress }, children: root.children.map(cloneTree) };
|
|
}
|
|
}
|
|
|
|
|
|
function canAbort(root: Progress.Node): boolean {
|
|
function canAbort(root: Progress.Node): boolean {
|
|
@@ -65,12 +74,11 @@ function canAbort(root: Progress.Node): boolean {
|
|
}
|
|
}
|
|
|
|
|
|
function snapshotProgress(info: ProgressInfo): Progress {
|
|
function snapshotProgress(info: ProgressInfo): Progress {
|
|
- return { root: cloneTree(info.root), canAbort: canAbort(info.root), tryAbort: info.tryAbort };
|
|
|
|
|
|
+ return { root: cloneTree(info.root), canAbort: canAbort(info.root), requestAbort: info.tryAbort };
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
- ctx.started = now();
|
|
|
|
|
|
+ ctx.node.progress.startedTime = now();
|
|
if (!task.__onAbort) return task.__f(ctx);
|
|
if (!task.__onAbort) return task.__f(ctx);
|
|
try {
|
|
try {
|
|
const ret = await task.__f(ctx);
|
|
const ret = await task.__f(ctx);
|
|
@@ -79,6 +87,7 @@ async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
// }
|
|
// }
|
|
return ret;
|
|
return ret;
|
|
} catch (e) {
|
|
} catch (e) {
|
|
|
|
+ // TODO: track cancellation
|
|
if (Task.isAborted(e)) task.__onAbort();
|
|
if (Task.isAborted(e)) task.__onAbort();
|
|
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
|
|
if (ExecuteObservable.PRINT_ERRORS_TO_STD_ERR) console.error(e);
|
|
throw e;
|
|
throw e;
|
|
@@ -86,7 +95,7 @@ async function runRoot<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
}
|
|
}
|
|
|
|
|
|
async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
- ctx.started = now();
|
|
|
|
|
|
+ ctx.node.progress.startedTime = now();
|
|
if (!task.__onAbort) return task.__f(ctx);
|
|
if (!task.__onAbort) return task.__f(ctx);
|
|
try {
|
|
try {
|
|
const ret = await task.__f(ctx);
|
|
const ret = await task.__f(ctx);
|
|
@@ -101,12 +110,21 @@ async function run<T>(task: Task<T>, ctx: ObservableRuntimeContext) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+function shouldNotify(info: ProgressInfo, time: number) {
|
|
|
|
+ return time - info.lastNotified > info.updateRateMs;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+function notifyObserver(info: ProgressInfo, time: number) {
|
|
|
|
+ info.lastNotified = time;
|
|
|
|
+ const snapshot = snapshotProgress(info);
|
|
|
|
+ info.observer(snapshot);
|
|
|
|
+}
|
|
|
|
+
|
|
class ObservableRuntimeContext implements RuntimeContext {
|
|
class ObservableRuntimeContext implements RuntimeContext {
|
|
isExecuting = true;
|
|
isExecuting = true;
|
|
- elapsedCpuMs: number;
|
|
|
|
- lastScheduledTime: number;
|
|
|
|
|
|
+ lastUpdatedTime = 0;
|
|
|
|
|
|
- started: number = 0;
|
|
|
|
node: Progress.Node;
|
|
node: Progress.Node;
|
|
info: ProgressInfo;
|
|
info: ProgressInfo;
|
|
|
|
|
|
@@ -116,12 +134,12 @@ class ObservableRuntimeContext implements RuntimeContext {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- get needsYield(): boolean {
|
|
|
|
|
|
+ get shouldUpdate(): boolean {
|
|
this.checkAborted();
|
|
this.checkAborted();
|
|
- return now() - this.info.lastUpdated > this.info.updateRateMs;
|
|
|
|
|
|
+ return now() - this.lastUpdatedTime > this.info.updateRateMs;
|
|
}
|
|
}
|
|
|
|
|
|
- private setProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) {
|
|
|
|
|
|
+ private updateProgress(update?: string | Partial<RuntimeContext.ProgressUpdate>) {
|
|
this.checkAborted();
|
|
this.checkAborted();
|
|
|
|
|
|
if (!update) return;
|
|
if (!update) return;
|
|
@@ -138,57 +156,49 @@ class ObservableRuntimeContext implements RuntimeContext {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private resume = () => {
|
|
|
|
- this.isExecuting = true;
|
|
|
|
- this.lastScheduledTime = now();
|
|
|
|
- }
|
|
|
|
|
|
+ update(progress?: string | Partial<RuntimeContext.ProgressUpdate>, dontNotify?: boolean): Promise<void> | void {
|
|
|
|
+ // The progress tracking and observer notification are separated
|
|
|
|
+ // because the computation can have a tree structure.
|
|
|
|
+ // All nodes of the tree should be regualarly updated at the specified frequency,
|
|
|
|
+ // however, the notification should only be invoked once per the whole tree.
|
|
|
|
|
|
- updateProgress(progress?: string | Partial<RuntimeContext.ProgressUpdate>) {
|
|
|
|
- this.setProgress(progress);
|
|
|
|
- }
|
|
|
|
|
|
+ this.lastUpdatedTime = now();
|
|
|
|
+ this.updateProgress(progress);
|
|
|
|
|
|
- yield(progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<void> {
|
|
|
|
- this.isExecuting = false;
|
|
|
|
- this.setProgress(progress);
|
|
|
|
- this.info.lastUpdated = now();
|
|
|
|
- const snapshot = snapshotProgress(this.info);
|
|
|
|
- this.info.observer(snapshot);
|
|
|
|
- return ImmediateScheduler.last(this.resume);
|
|
|
|
|
|
+ if (!!dontNotify || !shouldNotify(this.info, this.lastUpdatedTime)) return;
|
|
|
|
+
|
|
|
|
+ notifyObserver(this.info, this.lastUpdatedTime);
|
|
|
|
+ return Scheduler.immediatePromise();
|
|
}
|
|
}
|
|
|
|
|
|
async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> {
|
|
async runChild<T>(task: Task<T>, progress?: string | Partial<RuntimeContext.ProgressUpdate>): Promise<T> {
|
|
- this.setProgress(progress);
|
|
|
|
- const node: Progress.Node = { progress: defaultProgress(this.info.taskId, task), children: [] };
|
|
|
|
|
|
+ this.updateProgress(progress);
|
|
|
|
+
|
|
|
|
+ // Create a new child context and add it to the progress tree.
|
|
|
|
+ // When the child task finishes, remove the tree node.
|
|
|
|
+
|
|
|
|
+ const node: Progress.Node = { progress: defaultProgress(task), children: [] };
|
|
const children = this.node.children as Progress.Node[];
|
|
const children = this.node.children as Progress.Node[];
|
|
children.push(node);
|
|
children.push(node);
|
|
- const ctx = new ObservableRuntimeContext(task, this.info, node);
|
|
|
|
|
|
+ const ctx = new ObservableRuntimeContext(this.info, node);
|
|
try {
|
|
try {
|
|
return await run(task, ctx);
|
|
return await run(task, ctx);
|
|
} finally {
|
|
} finally {
|
|
// remove the progress node after the computation has finished.
|
|
// remove the progress node after the computation has finished.
|
|
const idx = children.indexOf(node);
|
|
const idx = children.indexOf(node);
|
|
if (idx >= 0) {
|
|
if (idx >= 0) {
|
|
- children[idx] = children[children.length - 1];
|
|
|
|
|
|
+ for (let i = idx, _i = children.length - 1; i < _i; i++) {
|
|
|
|
+ children[i] = children[i + 1];
|
|
|
|
+ }
|
|
children.pop();
|
|
children.pop();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- constructor(task: Task<any>, info: ProgressInfo, node: Progress.Node) {
|
|
|
|
- this.lastScheduledTime = this.started;
|
|
|
|
|
|
+ constructor(info: ProgressInfo, node: Progress.Node) {
|
|
this.node = node;
|
|
this.node = node;
|
|
this.info = info;
|
|
this.info = info;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-function ExecuteObservable<T>(task: Task<T>, observer: Progress.Observer, updateRateMs = 250) {
|
|
|
|
- const info = ProgressInfo(task, observer, updateRateMs);
|
|
|
|
- const ctx = new ObservableRuntimeContext(task, info, info.root);
|
|
|
|
- return runRoot(task, ctx);
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-namespace ExecuteObservable {
|
|
|
|
- export let PRINT_ERRORS_TO_STD_ERR = false;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-export default ExecuteObservable
|
|
|
|
|
|
+export { ExecuteObservable }
|