Browse Source

model-server: support tar.gz output in local mode

David Sehnal 5 years ago
parent
commit
e3e7fa3040

+ 2 - 2
src/mol-util/string.ts

@@ -11,8 +11,8 @@ export function indentString(str: string, count: number, indent: string) {
 }
 
 /** Add space between camelCase text. */
-export function splitCamelCase(str: string) {
-    return str.replace(/([a-z\xE0-\xFF])([A-Z\xC0\xDF])/g, '$1 $2')
+export function splitCamelCase(str: string, separator = ' ') {
+    return str.replace(/([a-z\xE0-\xFF])([A-Z\xC0\xDF])/g, `$1${separator}$2`)
 }
 
 /** Split camelCase text and capitalize. */

+ 30 - 11
src/servers/model/query.ts

@@ -13,24 +13,43 @@ console.log(`Mol* ModelServer (${Version}), (c) 2018-2020 Mol* authors`);
 console.log(``);
 
 let exampleWorkload: LocalInput = [{
-    input: 'c:/test/quick/1tqn.cif',
     output: 'c:/test/quick/localapi/1tqn_full.cif',
-    query: 'full', // same as defined in Api/Queries
+    queries: [{
+        input: 'c:/test/quick/1tqn.cif',
+        query: 'full', // same as defined in Api/Queries
+    }]
 }, {
-    input: 'c:/test/quick/1tqn.cif',
     output: 'c:/test/quick/localapi/1tqn_full.bcif',
-    query: 'full',
-    params: {}
+    queries: [{
+        input: 'c:/test/quick/1tqn.cif',
+        query: 'full'
+    }]
 }, {
-    input: 'c:/test/quick/1cbs_updated.cif',
     output: 'c:/test/quick/localapi/1cbs_ligint.cif',
-    query: 'residueInteraction', // action is case sensitive
-    params: { atom_site: { label_comp_id: 'REA' }, radius: 5 }
+    queries: [{
+        input: 'c:/test/quick/1cbs_updated.cif',
+        query: 'residueInteraction', // action is case sensitive
+        params: { atom_site: { label_comp_id: 'REA' }, radius: 5 }
+    }]
 }, {
-    input: 'c:/test/quick/1cbs_updated.cif', // multiple files that are repeated will only be parsed once
     output: 'c:/test/quick/localapi/1cbs_ligint.bcif',
-    query: 'residueInteraction',
-    params: { atom_site: { label_comp_id: 'REA' } } // parameters are just a JSON version of the query string
+    queries: [{
+        input: 'c:/test/quick/1cbs_updated.cif', // multiple files that are repeated will only be parsed once
+        query: 'residueInteraction',
+        params: { atom_site: [{ label_comp_id: 'REA' }], radius: 5 } // parameters are just a JSON version of the query string
+    }]
+}, {
+    output: 'c:/test/quick/localapi/multiple.tar.gz',
+    queries: [{
+        input: 'c:/test/quick/1cbs_updated.cif',
+        query: 'residueInteraction', // action is case sensitive
+        params: { atom_site: { label_comp_id: 'REA' }, radius: 5 }
+    }, {
+        input: 'c:/test/quick/1tqn.cif',
+        query: 'full', // same as defined in Api/Queries
+    }],
+    asTarGz: true,
+    gzipLevel: 6
 }];
 
 

+ 35 - 15
src/servers/model/server/api-local.ts

@@ -7,19 +7,26 @@
 import { ConsoleLogger } from '../../../mol-util/console-logger';
 import { now } from '../../../mol-util/now';
 import { PerformanceMonitor } from '../../../mol-util/performance-monitor';
-import { FileResultWriter } from '../utils/writer';
-import { QueryName } from './api';
+import { FileResultWriter, TarballFileResultWriter } from '../utils/writer';
+import { QueryName, QueryParams } from './api';
 import { Job, JobEntry, JobManager } from './jobs';
 import { resolveJob } from './query';
 import { StructureCache } from './structure-wrapper';
 
-export type LocalInput = {
+
+export type Entry<Q extends QueryName = QueryName> = {
     input: string,
-    output: string,
-    query: QueryName,
+    query: Q,
     modelNums?: number[],
-    params?: any,
-    binary?: boolean
+    params?: QueryParams<Q>,
+}
+
+export type LocalInput = {
+    queries: Entry[],
+    output: string,
+    binary?: boolean,
+    asTarGz?: boolean,
+    gzipLevel?: number
 }[];
 
 export async function runLocal(input: LocalInput) {
@@ -31,16 +38,19 @@ export async function runLocal(input: LocalInput) {
     for (const job of input) {
         const binary = /\.bcif/.test(job.output);
         JobManager.add({
-            entries: [JobEntry({
-                entryId: job.input,
-                queryName: job.query,
-                queryParams: job.params || { },
-                modelNums: job.modelNums,
-            })],
-            writer: new FileResultWriter(job.output),
+            entries: job.queries.map(q => JobEntry({
+                entryId: q.input,
+                queryName: q.query,
+                queryParams: q.params || { },
+                modelNums: q.modelNums,
+            })),
+            writer: job.asTarGz
+                ? new TarballFileResultWriter(job.output, job.gzipLevel)
+                : new FileResultWriter(job.output),
             options: {
                 outputFilename: job.output,
-                binary
+                binary,
+                tarball: job.asTarGz
             }
         });
     }
@@ -57,6 +67,8 @@ export async function runLocal(input: LocalInput) {
             job.writer.end();
             ConsoleLogger.logId(job.id, 'Query', 'Written.');
 
+            if (job.entries.length > 0) StructureCache.expireAll();
+
             if (JobManager.hasNext()) {
                 job = JobManager.getNext();
                 if (key !== job.entries[0].key) StructureCache.expire(key);
@@ -66,6 +78,14 @@ export async function runLocal(input: LocalInput) {
             }
         } catch (e) {
             ConsoleLogger.errorId(job.id, e);
+
+            if (JobManager.hasNext()) {
+                job = JobManager.getNext();
+                if (key !== job.entries[0].key) StructureCache.expire(key);
+                key = job.entries[0].key;
+            } else {
+                break;
+            }
         }
         ConsoleLogger.log('Progress', `[${++progress}/${input.length}] after ${PerformanceMonitor.format(now() - started)}.`);
     }

+ 2 - 1
src/servers/model/server/api-web.ts

@@ -18,6 +18,7 @@ import { getApiSchema, shortcutIconLink } from './api-schema';
 import { swaggerUiAssetsHandler, swaggerUiIndexHandler } from '../../common/swagger-ui';
 import { MultipleQuerySpec, getMultiQuerySpecFilename } from './api-web-multiple';
 import { SimpleResponseResultWriter, WebResutlWriter, TarballResponseResultWriter } from '../utils/writer';
+import { splitCamelCase } from '../../../mol-util/string';
 
 function makePath(p: string) {
     return Config.apiPrefix + '/' + p;
@@ -47,7 +48,7 @@ async function processNextJob() {
 
 export function createResultWriter(response: express.Response, isBinary: boolean, entryId?: string, queryName?: string) {
     const filenameBase = entryId && queryName
-        ? `${entryId}_${queryName.replace(/\s/g, '_')}`
+        ? `${entryId}_${splitCamelCase(queryName.replace(/\s/g, '_'), '-').toLowerCase()}`
         : `result`;
     return new SimpleResponseResultWriter(isBinary ? `${filenameBase}.bcif` : `${filenameBase}.cif`, response, isBinary);
 }

+ 2 - 2
src/servers/model/server/api.ts

@@ -171,10 +171,10 @@ const QueryMap = {
                     ? ctx => test.entityTest!(ctx) && ctx.element.unit.conformation.operator.isIdentity
                     : ctx => ctx.element.unit.conformation.operator.isIdentity
             })));
-            return Queries.modifiers.includeSurroundings(center, { radius: p.radius, wholeResidues: true });
+            return Queries.modifiers.includeSurroundings(center, { radius: p.radius !== void 0 ? p.radius : 5, wholeResidues: true });
         },
         structureTransform(p, s) {
-            return StructureSymmetry.builderSymmetryMates(s, p.radius).run();
+            return StructureSymmetry.builderSymmetryMates(s, p.radius !== void 0 ? p.radius : 5).run();
         },
         jsonParams: [ AtomSiteTestJsonParam, RadiusParam ],
         restParams: [ ...AtomSiteTestRestParams, RadiusParam ],

+ 31 - 7
src/servers/model/server/query.ts

@@ -4,20 +4,22 @@
  * @author David Sehnal <david.sehnal@gmail.com>
  */
 
+import * as path from 'path';
 import { Column } from '../../../mol-data/db';
 import { CifWriter } from '../../../mol-io/writer/cif';
-import { StructureQuery, StructureSelection, Structure } from '../../../mol-model/structure';
+import { Structure, StructureQuery, StructureSelection } from '../../../mol-model/structure';
 import { encode_mmCIF_categories } from '../../../mol-model/structure/export/mmcif';
 import { Progress } from '../../../mol-task';
-import { now } from '../../../mol-util/now';
 import { ConsoleLogger } from '../../../mol-util/console-logger';
+import { now } from '../../../mol-util/now';
 import { PerformanceMonitor } from '../../../mol-util/performance-monitor';
 import { ModelServerConfig as Config } from '../config';
+import { createModelPropertiesProviderFromConfig, ModelPropertiesProvider } from '../property-provider';
 import Version from '../version';
 import { Job, JobEntry } from './jobs';
-import { createStructureWrapperFromJobEntry, StructureWrapper, resolveStructures } from './structure-wrapper';
+import { createStructureWrapperFromJobEntry, resolveStructures, StructureWrapper } from './structure-wrapper';
 import CifField = CifWriter.Field
-import { createModelPropertiesProviderFromConfig, ModelPropertiesProvider } from '../property-provider';
+import { splitCamelCase } from '../../../mol-util/string';
 
 export interface Stats {
     structure: StructureWrapper,
@@ -55,6 +57,7 @@ async function resolveSingleFile(job: Job) {
     const headerMap = new Map<string, number>();
 
     for (const entry of job.entries) {
+        let hasDataBlock = false;
         try {
             const structure = await createStructureWrapperFromJobEntry(entry, propertyProvider());
 
@@ -68,12 +71,17 @@ async function resolveSingleFile(job: Job) {
             }
 
             encoder.startDataBlock(header);
+            hasDataBlock = true;
             await resolveJobEntry(entry, structure, encoder);
         } catch (e) {
             if (job.entries.length === 1) {
                 throw e;
             } else {
+                if (!hasDataBlock) {
+                    createErrorDataBlock(entry, encoder);
+                }
                 doError(entry, encoder, e);
+                ConsoleLogger.errorId(entry.job.id, '' + e);
             }
         }
     }
@@ -83,8 +91,8 @@ async function resolveSingleFile(job: Job) {
     encoder.writeTo(job.writer);
 }
 
-function getFilename(i: number, entry: JobEntry, isBinary: boolean) {
-    return `${i}_${entry.entryId}_${entry.queryDefinition.name.replace(/\s/g, '_')}.${isBinary ? 'bcif' : 'cif'}`;
+function getFilename(i: number, entry: JobEntry, header: string, isBinary: boolean) {
+    return `${i}_${header.toLowerCase()}_${splitCamelCase(entry.queryDefinition.name.replace(/\s/g, '_'), '-').toLowerCase()}.${isBinary ? 'bcif' : 'cif'}`;
 }
 
 async function resolveMultiFile(job: Job) {
@@ -99,18 +107,26 @@ async function resolveMultiFile(job: Job) {
             binaryAutoClassifyEncoding: true
         });
 
+        let hasDataBlock = false;
+        let header = '';
         try {
             const structure = await createStructureWrapperFromJobEntry(entry, propertyProvider());
+            header = structure.cifFrame.header;
             encoder.startDataBlock(structure.cifFrame.header);
+            hasDataBlock = true;
             await resolveJobEntry(entry, structure, encoder);
         } catch(e) {
+            if (!hasDataBlock) {
+                header = createErrorDataBlock(entry, encoder);
+            }
+            ConsoleLogger.errorId(entry.job.id, '' + e);
             doError(entry, encoder, e);
         }
 
         ConsoleLogger.logId(job.id, 'Query', `Encoding ${entry.key}/${entry.queryDefinition.name}`);
         encoder.encode();
 
-        job.writer.beginEntry(getFilename(++i, entry, job.responseFormat.isBinary), encoder.getSize());
+        job.writer.beginEntry(getFilename(++i, entry, header, job.responseFormat.isBinary), encoder.getSize());
         encoder.writeTo(job.writer);
         job.writer.endEntry();
         ConsoleLogger.logId(job.id, 'Query', `Written ${entry.key}/${entry.queryDefinition.name}`);
@@ -119,6 +135,14 @@ async function resolveMultiFile(job: Job) {
     }
 }
 
+function createErrorDataBlock(job: JobEntry, encoder: CifWriter.Encoder<any>) {
+    let header;
+    if (job.sourceId === '_local_') header = path.basename(job.entryId).replace(/[^a-z0-9\-]/gi, '').toUpperCase();
+    else header = job.entryId.replace(/[^a-z0-9\-]/gi, '').toUpperCase();
+    encoder.startDataBlock(header);
+    return header;
+}
+
 async function resolveJobEntry(entry: JobEntry, structure: StructureWrapper, encoder: CifWriter.Encoder<any>) {
     ConsoleLogger.logId(entry.job.id, 'Query', `Start ${entry.key}/${entry.queryDefinition.name}.`);
 

+ 62 - 11
src/servers/model/utils/writer.ts

@@ -32,7 +32,7 @@ export class SimpleResponseResultWriter implements WebResutlWriter {
         throw new Error('Not supported');
     }
 
-    async endEntry() {
+    endEntry() {
         throw new Error('Not supported');
     }
 
@@ -57,7 +57,7 @@ export class SimpleResponseResultWriter implements WebResutlWriter {
     }
 
     writeBinary(data: Uint8Array) {
-        return this.res.write(Buffer.from(data.buffer));
+        return this.res.write(Buffer.from(data.buffer, data.byteOffset, data.byteLength));
     }
 
     writeString(this: any, data: string) {
@@ -105,7 +105,7 @@ export class TarballResponseResultWriter implements WebResutlWriter {
     writeHeader() {
         if (this.headerWritten) return;
 
-        this.stream.pipe(this.res);
+        this.stream.pipe(this.res, { end: true });
         this.stream.on('end', () => this.res.end());
 
         this.headerWritten = true;
@@ -119,7 +119,7 @@ export class TarballResponseResultWriter implements WebResutlWriter {
 
     writeBinary(data: Uint8Array) {
         this.writeHeader();
-        return !!this.stream.write(Buffer.from(data.buffer));
+        return !!this.stream.write(Buffer.from(data.buffer, data.byteOffset, data.byteLength));
     }
 
     writeString(data: string) {
@@ -144,44 +144,95 @@ export class TarballResponseResultWriter implements WebResutlWriter {
 }
 
 export class FileResultWriter implements ResultWriter {
-    private file = 0;
+    private file: fs.WriteStream | undefined = void 0;
     private ended = false;
     private opened = false;
 
-    async beginEntry(name: string) {
+    beginEntry(name: string) {
         throw new Error('Not supported');
     }
 
-    async endEntry() {
+    endEntry() {
         throw new Error('Not supported');
     }
 
     open() {
         if (this.opened) return;
         makeDir(path.dirname(this.fn));
-        this.file = fs.openSync(this.fn, 'w');
+        this.file = fs.createWriteStream(this.fn);
         this.opened = true;
     }
 
     writeBinary(data: Uint8Array) {
         this.open();
-        fs.writeSync(this.file, Buffer.from(data.buffer));
+        this.file?.write(Buffer.from(data.buffer, data.byteOffset, data.byteLength));
         return true;
     }
 
     writeString(data: string) {
         this.open();
-        fs.writeSync(this.file, data);
+        this.file?.write(data);
         return true;
     }
 
     end() {
         if (!this.opened || this.ended) return;
-        fs.close(this.file, function () { });
+        this.file?.end();
         this.ended = true;
     }
 
     constructor(private fn: string) {
 
+    }
+}
+
+export class TarballFileResultWriter implements ResultWriter {
+    private file: fs.WriteStream | undefined = void 0;
+    private ended = false;
+    private opened = false;
+    private stream = zlib.createGzip({ level: this.gzipLevel, memLevel: 9, chunkSize: 16 * 16384 });
+    private entrySize = 0;
+
+    beginEntry(name: string, size: number) {
+        const header = encodeTarHeader({ name, size });
+        this.entrySize = size;
+        this.stream.write(header);
+    }
+
+    endEntry() {
+        const size = this.entrySize & 511;
+        if (size) this.stream.write(END_OF_TAR.slice(0, 512 - size));
+    }
+
+    open() {
+        if (this.opened) return;
+        makeDir(path.dirname(this.fn));
+        this.file = fs.createWriteStream(this.fn);
+        this.stream.pipe(this.file, { end: true });
+
+        this.opened = true;
+    }
+
+    writeBinary(data: Uint8Array) {
+        this.open();
+        this.stream.write(Buffer.from(data.buffer, data.byteOffset, data.byteLength));
+        return true;
+    }
+
+    writeString(data: string) {
+        this.open();
+        this.stream.write(data);
+        return true;
+    }
+
+    end() {
+        if (!this.opened || this.ended) return;
+        this.stream.write(END_OF_TAR);
+        this.stream.end();
+        this.ended = true;
+    }
+
+    constructor(private fn: string, private gzipLevel: number = 6) {
+
     }
 }