Browse Source

ModelServer job queue

David Sehnal 6 years ago
parent
commit
e8ddbc0de9

+ 1 - 1
src/mol-model/structure/export/categories/atom_site.ts

@@ -39,7 +39,7 @@ const atom_site_fields: CifField<StructureElement>[] = [
 
     CifField.int('pdbx_PDB_model_num', P.unit.model_num, { encoder: E.deltaRLE }),
     CifField.str<StructureElement, Structure>('operator_name', P.unit.operator_name, {
-        shouldInclude: structure => { console.log(!!structure); return structure.units.some(u => !u.conformation.operator.isIdentity) }
+        shouldInclude: structure => structure.units.some(u => !u.conformation.operator.isIdentity)
     })
 ];
 

+ 0 - 3
src/mol-model/structure/model/properties/utils/atomic-ranges.ts

@@ -65,9 +65,6 @@ export function getAtomicRanges(data: AtomicData, segments: AtomicSegments, chem
         }
     }
 
-    console.log('polymerRanges', polymerRanges)
-    console.log('gapRanges', gapRanges)
-
     return {
         polymerRanges: SortedRanges.ofSortedRanges(polymerRanges as ElementIndex[]),
         gapRanges: SortedRanges.ofSortedRanges(gapRanges as ElementIndex[])

+ 0 - 3
src/mol-model/structure/model/properties/utils/coarse-ranges.ts

@@ -22,7 +22,6 @@ export function getCoarseRanges(data: CoarseElementData, chemicalComponentMap: M
 
     while (chainIt.hasNext) {
         const { start, end } = chainIt.move();
-        console.log('chain', start, end)
 
         let startIndex = -1
         let prevSeqEnd = -1
@@ -45,8 +44,6 @@ export function getCoarseRanges(data: CoarseElementData, chemicalComponentMap: M
         }
     }
 
-    console.log(polymerRanges, gapRanges)
-
     return {
         polymerRanges: SortedRanges.ofSortedRanges(polymerRanges as ElementIndex[]),
         gapRanges: SortedRanges.ofSortedRanges(gapRanges as ElementIndex[])

+ 4 - 1
src/servers/model/config.ts

@@ -44,9 +44,12 @@ const config = {
      */
     maxQueryTimeInMs: 5 * 1000,
 
+    /** Maximum number of requests before "server busy" */
+    maxQueueLength: 30,
+
     /**
      * Maps a request identifier to a filename.
-     * 
+     *
      * @param source
      *   Source of the data.
      * @param id

+ 9 - 4
src/servers/model/server/cache.ts

@@ -48,18 +48,24 @@ export class Cache<T> {
     private refresh(e: CacheNode<T>) {
         this.clearTimeout(e);
 
-        e.value.timeoutId = setTimeout(() => this.expire(e), ServerConfig.cacheParams.entryTimeoutInMs);
+        e.value.timeoutId = setTimeout(() => this.expireNode(e), ServerConfig.cacheParams.entryTimeoutInMs);
         this.entries.remove(e);
         this.entries.addFirst(e.value);
     }
 
-    private expire(e: CacheNode<T>, notify = true) {
+    private expireNode(e: CacheNode<T>, notify = true) {
         if (notify) ConsoleLogger.log('Cache', `${e.value.key} expired.`);
         this.dispose(e);
     }
 
     expireAll() {
-        for (let e = this.entries.first; e; e = e.next) this.expire(e, false);
+        for (let e = this.entries.first; e; e = e.next) this.expireNode(e, false);
+    }
+
+    expire(key: string) {
+        const entry = this.entryMap.get(key);
+        if (!entry) return;
+        this.expireNode(entry);
     }
 
     add(item: T) {
@@ -86,7 +92,6 @@ export class Cache<T> {
         return this.entryMap.has(key);
     }
 
-
     get(key: string) {
         if (!this.entryMap.has(key)) return void 0;
         let e = this.entryMap.get(key)!;

+ 85 - 0
src/servers/model/server/jobs.ts

@@ -0,0 +1,85 @@
+/**
+ * Copyright (c) 2018 mol* contributors, licensed under MIT, See LICENSE file for more info.
+ *
+ * @author David Sehnal <david.sehnal@gmail.com>
+ */
+
+import { UUID } from 'mol-util';
+import { getQueryByName, normalizeQueryParams, QueryDefinition } from './api';
+import { LinkedList } from 'mol-data/generic';
+
+export interface ResponseFormat {
+    isBinary: boolean
+}
+
+export interface Job {
+    id: UUID,
+    datetime_utc: string,
+
+    sourceId: '_local_' | string,
+    entryId: string,
+    key: string,
+
+    queryDefinition: QueryDefinition,
+    normalizedParams: any,
+    responseFormat: ResponseFormat
+}
+
+export function createJob(sourceId: '_local_' | string, entryId: string, queryName: string, params: any): Job {
+    const queryDefinition = getQueryByName(queryName);
+    if (!queryDefinition) throw new Error(`Query '${queryName}' is not supported.`);
+
+    const normalizedParams = normalizeQueryParams(queryDefinition, params);
+
+    return {
+        id: UUID.create(),
+        datetime_utc: `${new Date().toISOString().replace(/T/, ' ').replace(/\..+/, '')}`,
+        key: `${sourceId}/${entryId}`,
+        sourceId,
+        entryId,
+        queryDefinition,
+        normalizedParams,
+        responseFormat: { isBinary: !!params.binary }
+    };
+}
+
+class _JobQueue {
+    private list: LinkedList<Job> = LinkedList();
+
+    get size() {
+        return this.list.count;
+    }
+
+    add(sourceId: '_local_' | string, entryId: string, queryName: string, params: any) {
+        const job = createJob(sourceId, entryId, queryName, params);
+        this.list.addLast(job);
+        return job.id;
+    }
+
+    hasNext(): boolean {
+        return this.list.count > 0;
+    }
+
+    getNext(): Job {
+        return this.list.removeFirst()!;
+    }
+
+    /** Sort the job list by key = sourceId/entryId */
+    sort() {
+        if (this.list.count === 0) return;
+
+        const jobs: Job[] = [];
+        for (let j = this.list.first; !!j; j = j.next) {
+            jobs[jobs.length] = j.value;
+        }
+
+        jobs.sort((a, b) => a.key < b.key ? -1 : 1);
+
+        this.list = LinkedList();
+        for (const j of jobs) {
+            this.list.addLast(j);
+        }
+    }
+}
+
+export const JobManager = new _JobQueue();

+ 35 - 70
src/servers/model/server/query.ts

@@ -4,35 +4,19 @@
  * @author David Sehnal <david.sehnal@gmail.com>
  */
 
-import { UUID } from 'mol-util';
-import { getQueryByName, normalizeQueryParams, QueryDefinition } from './api';
-import { getStructure, StructureWrapper } from './structure-wrapper';
-import Config from '../config';
-import { Progress, now } from 'mol-task';
-import { ConsoleLogger } from 'mol-util/console-logger';
+import { Column } from 'mol-data/db';
+import { CifWriter } from 'mol-io/writer/cif';
 import Writer from 'mol-io/writer/writer';
-import { CifWriter } from 'mol-io/writer/cif'
+import { StructureQuery, StructureSelection } from 'mol-model/structure';
 import { encode_mmCIF_categories } from 'mol-model/structure/export/mmcif';
-import { StructureSelection, StructureQuery } from 'mol-model/structure';
-import Version from '../version'
-import { Column } from 'mol-data/db';
+import { now, Progress } from 'mol-task';
+import { ConsoleLogger } from 'mol-util/console-logger';
 import { PerformanceMonitor } from 'mol-util/performance-monitor';
-
-export interface ResponseFormat {
-    isBinary: boolean
-}
-
-export interface Request {
-    id: UUID,
-    datetime_utc: string,
-
-    sourceId: '_local_' | string,
-    entryId: string,
-
-    queryDefinition: QueryDefinition,
-    normalizedParams: any,
-    responseFormat: ResponseFormat
-}
+import Config from '../config';
+import Version from '../version';
+import { Job } from './jobs';
+import { getStructure, StructureWrapper } from './structure-wrapper';
+import CifField = CifWriter.Field
 
 export interface Stats {
     structure: StructureWrapper,
@@ -40,53 +24,36 @@ export interface Stats {
     encodeTimeMs: number
 }
 
-export function createRequest(sourceId: '_local_' | string, entryId: string, queryName: string, params: any): Request {
-    const queryDefinition = getQueryByName(queryName);
-    if (!queryDefinition) throw new Error(`Query '${queryName}' is not supported.`);
-
-    const normalizedParams = normalizeQueryParams(queryDefinition, params);
-
-    return {
-        id: UUID.create(),
-        datetime_utc: `${new Date().toISOString().replace(/T/, ' ').replace(/\..+/, '')}`,
-        sourceId,
-        entryId,
-        queryDefinition,
-        normalizedParams,
-        responseFormat: { isBinary: !!params.binary }
-    };
-}
-
 const perf = new PerformanceMonitor();
 
-export async function resolveRequest(req: Request, writer: Writer) {
-    ConsoleLogger.logId(req.id, 'Query', 'Starting.');
+export async function resolveJob(job: Job, writer: Writer) {
+    ConsoleLogger.logId(job.id, 'Query', 'Starting.');
 
-    const wrappedStructure = await getStructure(req.sourceId, req.entryId);
+    const wrappedStructure = await getStructure(job);
 
     perf.start('query');
-    const structure = req.queryDefinition.structureTransform
-        ? await req.queryDefinition.structureTransform(req.normalizedParams, wrappedStructure.structure)
+    const structure = job.queryDefinition.structureTransform
+        ? await job.queryDefinition.structureTransform(job.normalizedParams, wrappedStructure.structure)
         : wrappedStructure.structure;
-    const query = req.queryDefinition.query(req.normalizedParams, structure);
+    const query = job.queryDefinition.query(job.normalizedParams, structure);
     const result = StructureSelection.unionStructure(StructureQuery.run1(query, structure));
     perf.end('query');
 
-    ConsoleLogger.logId(req.id, 'Query', 'Query finished.');
+    ConsoleLogger.logId(job.id, 'Query', 'Query finished.');
 
-    const encoder = CifWriter.createEncoder({ binary: req.responseFormat.isBinary, encoderName: `ModelServer ${Version}` });
+    const encoder = CifWriter.createEncoder({ binary: job.responseFormat.isBinary, encoderName: `ModelServer ${Version}` });
 
     perf.start('encode');
     encoder.startDataBlock(structure.units[0].model.label.toUpperCase());
-    encoder.writeCategory(_model_server_result, [req]);
-    encoder.writeCategory(_model_server_params, [req]);
+    encoder.writeCategory(_model_server_result, [job]);
+    encoder.writeCategory(_model_server_params, [job]);
 
     // encoder.setFilter(mmCIF_Export_Filters.onlyPositions);
     encode_mmCIF_categories(encoder, result);
     // encoder.setFilter();
     perf.end('encode');
 
-    ConsoleLogger.logId(req.id, 'Query', 'Encoded.');
+    ConsoleLogger.logId(job.id, 'Query', 'Encoded.');
 
     const stats: Stats = {
         structure: wrappedStructure,
@@ -99,7 +66,7 @@ export async function resolveRequest(req: Request, writer: Writer) {
 
     encoder.writeTo(writer);
 
-    ConsoleLogger.logId(req.id, 'Query', 'Written.');
+    ConsoleLogger.logId(job.id, 'Query', 'Written.');
 }
 
 const maxTime = Config.maxQueryTimeInMs;
@@ -109,8 +76,6 @@ export function abortingObserver(p: Progress) {
     }
 }
 
-import CifField = CifWriter.Field
-
 function string<T>(name: string, str: (data: T, i: number) => string, isSpecified?: (data: T) => boolean): CifField<number, T> {
     if (isSpecified) {
         return CifField.str(name, (i, d) => str(d, i), { valueKind: (i, d) => isSpecified(d) ? Column.ValueKind.Present : Column.ValueKind.NotPresent });
@@ -122,13 +87,13 @@ function int32<T>(name: string, value: (data: T) => number): CifField<number, T>
     return CifField.int(name, (i, d) => value(d));
 }
 
-const _model_server_result_fields: CifField<number, Request>[] = [
-    string<Request>('request_id', ctx => '' + ctx.id),
-    string<Request>('datetime_utc', ctx => ctx.datetime_utc),
-    string<Request>('server_version', ctx => Version),
-    string<Request>('query_name', ctx => ctx.queryDefinition.name),
-    string<Request>('source_id', ctx => ctx.sourceId),
-    string<Request>('entry_id', ctx => ctx.entryId),
+const _model_server_result_fields: CifField<number, Job>[] = [
+    string<Job>('job_id', ctx => '' + ctx.id),
+    string<Job>('datetime_utc', ctx => ctx.datetime_utc),
+    string<Job>('server_version', ctx => Version),
+    string<Job>('query_name', ctx => ctx.queryDefinition.name),
+    string<Job>('source_id', ctx => ctx.sourceId),
+    string<Job>('entry_id', ctx => ctx.entryId),
 ];
 
 const _model_server_params_fields: CifField<number, string[]>[] = [
@@ -145,17 +110,17 @@ const _model_server_stats_fields: CifField<number, Stats>[] = [
 ];
 
 
-const _model_server_result: CifWriter.Category<Request> = {
+const _model_server_result: CifWriter.Category<Job> = {
     name: 'model_server_result',
-    instance: (request) => ({ data: request, fields: _model_server_result_fields, rowCount: 1 })
+    instance: (job) => ({ data: job, fields: _model_server_result_fields, rowCount: 1 })
 };
 
-const _model_server_params: CifWriter.Category<Request> = {
+const _model_server_params: CifWriter.Category<Job> = {
     name: 'model_server_params',
-    instance(request) {
+    instance(job) {
         const params: string[][] = [];
-        for (const k of Object.keys(request.normalizedParams)) {
-            params.push([k, '' + request.normalizedParams[k]]);
+        for (const k of Object.keys(job.normalizedParams)) {
+            params.push([k, '' + job.normalizedParams[k]]);
         }
         return {
             data: params,

+ 6 - 5
src/servers/model/server/structure-wrapper.ts

@@ -12,6 +12,7 @@ import CIF from 'mol-io/reader/cif'
 import * as util from 'util'
 import * as fs from 'fs'
 import * as zlib from 'zlib'
+import { Job } from './jobs';
 
 require('util.promisify').shim();
 
@@ -38,13 +39,12 @@ export class StructureWrapper {
     structure: Structure;
 }
 
-export async function getStructure(sourceId: '_local_' | string, entryId: string): Promise<StructureWrapper> {
-    const key = `${sourceId}/${entryId}`;
+export async function getStructure(job: Job): Promise<StructureWrapper> {
     if (Config.cacheParams.useCache) {
-        const ret = StructureCache.get(key);
+        const ret = StructureCache.get(job.key);
         if (ret) return ret;
     }
-    const ret = await readStructure(key, sourceId, entryId);
+    const ret = await readStructure(job.key, job.sourceId, job.entryId);
     if (Config.cacheParams.useCache) {
         StructureCache.add(ret);
     }
@@ -83,7 +83,8 @@ async function parseCif(data: string|Uint8Array) {
 
 async function readStructure(key: string, sourceId: string, entryId: string) {
     const filename = sourceId === '_local_' ? entryId : Config.mapFile(sourceId, entryId);
-    if (!filename) throw new Error(`Entry '${key}' not found.`);
+    if (!filename) throw new Error(`Cound not map '${key}' to a valid filename.`);
+    if (!fs.existsSync(filename)) throw new Error(`Could not map '${key}' to an existing file.`);
 
     perf.start('read');
     const data = await readFile(filename);

+ 39 - 8
src/servers/model/server/web-api.ts

@@ -8,7 +8,9 @@ import * as express from 'express';
 import Config from '../config';
 import { QueryDefinition, QueryList } from './api';
 import { ConsoleLogger } from 'mol-util/console-logger';
-import { createRequest, resolveRequest } from './query';
+import { resolveJob } from './query';
+import { JobManager } from './jobs';
+import { UUID } from 'mol-util';
 
 function makePath(p: string) {
     return Config.appPrefix + '/' + p;
@@ -16,9 +18,9 @@ function makePath(p: string) {
 
 function wrapResponse(fn: string, res: express.Response) {
     const w = {
-        do404(this: any) {
+        doError(this: any, code = 404) {
             if (!this.headerWritten) {
-                res.writeHead(404);
+                res.writeHead(code);
                 this.headerWritten = true;
             }
             this.end();
@@ -53,15 +55,44 @@ function wrapResponse(fn: string, res: express.Response) {
     return w;
 }
 
+const responseMap = new Map<UUID, express.Response>();
+
+async function processNextJob() {
+    if (!JobManager.hasNext()) return;
+
+    const job = JobManager.getNext();
+    const response = responseMap.get(job.id)!;
+    responseMap.delete(job.id);
+
+    const filenameBase = `${job.entryId}_${job.queryDefinition.name.replace(/\s/g, '_')}`
+    const writer = wrapResponse(job.responseFormat.isBinary ? `${filenameBase}.bcif` : `${filenameBase}.cif`, response);
+    try {
+        writer.writeHeader(job.responseFormat.isBinary);
+        await resolveJob(job, writer);
+        writer.end();
+    } catch (e) {
+        ConsoleLogger.errorId(job.id, '' + e);
+        // TODO: add some error?
+        writer.doError(404);
+    } finally {
+        setImmediate(processNextJob);
+    }
+}
+
 function mapQuery(app: express.Express, queryName: string, queryDefinition: QueryDefinition) {
     app.get(makePath(':entryId/' + queryName), async (req, res) => {
         ConsoleLogger.log('Server', `Query '${req.params.entryId}/${queryName}'...`);
 
-        const request = createRequest('pdb', req.params.entryId, queryName, req.query);
-        const writer = wrapResponse(request.responseFormat.isBinary ? 'result.bcif' : 'result.cif', res);
-        writer.writeHeader(request.responseFormat.isBinary);
-        await resolveRequest(request, writer);
-        writer.end();
+        if (JobManager.size >= Config.maxQueueLength) {
+            // TODO use proper code: server busy
+            res.writeHead(404);
+            res.end();
+            return;
+        }
+
+        const jobId = JobManager.add('pdb', req.params.entryId, queryName, req.query);
+        responseMap.set(jobId, res);
+        processNextJob();
     });
 }
 

+ 4 - 3
src/servers/model/test.ts

@@ -1,6 +1,7 @@
-import { createRequest, resolveRequest } from './server/query';
+import { resolveJob } from './server/query';
 import * as fs from 'fs'
 import { StructureCache } from './server/structure-wrapper';
+import { createJob } from './server/jobs';
 
 function wrapFile(fn: string) {
     const w = {
@@ -34,9 +35,9 @@ function wrapFile(fn: string) {
 
 async function run() {
     try {
-        const request = createRequest('_local_', 'e:/test/quick/1cbs_updated.cif', 'residueInteraction', { label_comp_id: 'REA' });
+        const request = createJob('_local_', 'e:/test/quick/1cbs_updated.cif', 'residueInteraction', { label_comp_id: 'REA' });
         const writer = wrapFile('e:/test/mol-star/1cbs_full.cif');
-        await resolveRequest(request, writer);
+        await resolveJob(request, writer);
         writer.end();
     } finally {
         StructureCache.expireAll();