Browse Source

model-server: query-many result as tar.gz

David Sehnal 5 years ago
parent
commit
6e60a2f9dc

+ 4 - 0
src/mol-io/writer/cif/encoder/binary.ts

@@ -94,6 +94,10 @@ export default class BinaryEncoder implements Encoder<Uint8Array> {
         return this.encodedData;
     }
 
+    getSize() {
+        return this.encodedData.length;
+    }
+
     constructor(encoder: string, encodingProvider: BinaryEncodingProvider | undefined, private autoClassify: boolean) {
         this.binaryEncodingProvider = encodingProvider;
         this.data = {

+ 4 - 0
src/mol-io/writer/cif/encoder/text.ts

@@ -69,6 +69,10 @@ export default class TextEncoder implements Encoder<string> {
         }
     }
 
+    getSize() {
+        return StringBuilder.getSize(this.builder);
+    }
+
     getData() {
         return StringBuilder.getString(this.builder);
     }

+ 2 - 1
src/mol-io/writer/encoder.ts

@@ -8,7 +8,8 @@ import Writer from './writer'
 
 interface Encoder {
     encode(): void,
-    writeTo(writer: Writer): void
+    writeTo(writer: Writer): void,
+    getSize(): number
 }
 
 export default Encoder

+ 7 - 0
src/mol-util/string-builder.ts

@@ -38,6 +38,13 @@ namespace StringBuilder {
         return builder.chunks.join('');
     }
 
+    export function getSize(builder: StringBuilder): number {
+        let size = 0;
+        for (const c of builder.chunks) size += c.length;
+        for (let i = 0; i < builder.offset; i++) size += builder.current[i].length;
+        return size;
+    }
+
     export function getChunks(builder: StringBuilder): string[] {
         if (builder.offset > 0) {
             if (builder.current.length === builder.offset) builder.chunks[builder.chunks.length] = builder.current.join('');

+ 4 - 0
src/servers/model/CHANGELOG.md

@@ -1,3 +1,7 @@
+# 0.9.1
+* query-many
+* Config overhaul
+
 # 0.9.0
 * REST API support.
 * Swagger UI support.

+ 5 - 5
src/servers/model/preprocess/preprocess.ts

@@ -9,9 +9,9 @@ import { classifyCif } from './converter';
 import { Structure } from '../../../mol-model/structure';
 import { CifWriter } from '../../../mol-io/writer/cif';
 import Writer from '../../../mol-io/writer/writer';
-import { wrapFileToWriter } from '../server/api-local';
 import { encode_mmCIF_categories, CifExportContext } from '../../../mol-model/structure/export/mmcif';
 import { ModelPropertiesProvider } from '../property-provider';
+import { FileResultWriter } from '../utils/writer';
 
 // TODO: error handling
 
@@ -28,14 +28,14 @@ async function preprocess(filename: string, propertyProvider?: ModelPropertiesPr
     const exportCtx = CifExportContext.create(inputStructures);
 
     if (outputCif) {
-        const writer = wrapFileToWriter(outputCif);
+        const writer = new FileResultWriter(outputCif);
         const encoder = CifWriter.createEncoder({ binary: false });
         encode(inputStructures[0], input.cifFrame.header, categories, encoder, exportCtx, writer);
         writer.end();
     }
 
     if (outputBcif) {
-        const writer = wrapFileToWriter(outputBcif);
+        const writer = new FileResultWriter(outputBcif);
         const encoder = CifWriter.createEncoder({ binary: true, binaryAutoClassifyEncoding: true });
         encode(inputStructures[0], input.cifFrame.header, categories, encoder, exportCtx, writer);
         writer.end();
@@ -47,14 +47,14 @@ async function convert(filename: string, outputCif?: string, outputBcif?: string
     const categories = await classifyCif(frame);
 
     if (outputCif) {
-        const writer = wrapFileToWriter(outputCif);
+        const writer = new FileResultWriter(outputCif);
         const encoder = CifWriter.createEncoder({ binary: false });
         encodeConvert(frame.header, categories, encoder, writer);
         writer.end();
     }
 
     if (outputBcif) {
-        const writer = wrapFileToWriter(outputBcif);
+        const writer = new FileResultWriter(outputBcif);
         const encoder = CifWriter.createEncoder({ binary: true, binaryAutoClassifyEncoding: true });
         encodeConvert(frame.header, categories, encoder, writer);
         writer.end();

+ 2 - 4
src/servers/model/server/api-local.ts

@@ -53,10 +53,8 @@ export async function runLocal(input: LocalInput) {
     let progress = 0;
     while (job) {
         try {
-            const encoder = await resolveJob(job);
-            const writer = job.writer;
-            encoder.writeTo(writer);
-            writer.end();
+            await resolveJob(job);
+            job.writer.end();
             ConsoleLogger.logId(job.id, 'Query', 'Written.');
 
             if (JobManager.hasNext()) {

+ 2 - 1
src/servers/model/server/api-schema.ts

@@ -54,7 +54,8 @@ function getPaths() {
             { entryId: '1cbs', query: 'residueInteraction', params: { atom_site: [{ label_comp_id: 'REA' }], radius: 5 } },
             { entryId: '1tqn', query: 'full' }
         ],
-        encoding: 'cif'
+        encoding: 'cif',
+        asTarGz: false
     };
     ret[`${ServerConfig.apiPrefix}/v1/query-many`] = {
         get: {

+ 7 - 1
src/servers/model/server/api-web-multiple.ts

@@ -16,5 +16,11 @@ export interface MultipleQueryEntry<Name extends QueryName = QueryName> {
 
 export interface MultipleQuerySpec {
     queries: MultipleQueryEntry[],
-    encoding?: 'cif' | 'bcif'
+    encoding?: 'cif' | 'bcif',
+    asTarGz?: boolean
+}
+
+export function getMultiQuerySpecFilename() {
+    const date = new Date();
+    return `result_${date.getMonth() + 1}-${date.getDate()}-${date.getHours()}-${date.getMinutes()}-${date.getSeconds()}.tar.gz`;
 }

+ 9 - 6
src/servers/model/server/api-web.ts

@@ -16,8 +16,8 @@ import { UUID } from '../../../mol-util';
 import { QueryDefinition, normalizeRestQueryParams, normalizeRestCommonParams, QueryList } from './api';
 import { getApiSchema, shortcutIconLink } from './api-schema';
 import { swaggerUiAssetsHandler, swaggerUiIndexHandler } from '../../common/swagger-ui';
-import { MultipleQuerySpec } from './api-web-multiple';
-import { SimpleResponseResultWriter, WebResutlWriter } from '../utils/writer';
+import { MultipleQuerySpec, getMultiQuerySpecFilename } from './api-web-multiple';
+import { SimpleResponseResultWriter, WebResutlWriter, TarballResponseResultWriter } from '../utils/writer';
 
 function makePath(p: string) {
     return Config.apiPrefix + '/' + p;
@@ -33,9 +33,8 @@ async function processNextJob() {
     const writer = job.writer as WebResutlWriter;
 
     try {
-        const encoder = await resolveJob(job);
         writer.writeHeader();
-        encoder.writeTo(writer);
+        await resolveJob(job);
     } catch (e) {
         ConsoleLogger.errorId(job.id, '' + e);
         writer.doError(404, '' + e);
@@ -119,6 +118,10 @@ function serveStatic(req: express.Request, res: express.Response) {
 }
 
 function createMultiJob(spec: MultipleQuerySpec, res: express.Response) {
+    const writer = spec.asTarGz
+        ? new TarballResponseResultWriter(getMultiQuerySpecFilename(), res)
+        : createResultWriter(res, spec.encoding?.toLowerCase() === 'bcif')
+
     const jobId = JobManager.add({
         entries: spec.queries.map(q => JobEntry({
             sourceId: q.data_source || ModelServerConfig.defaultSource,
@@ -127,8 +130,8 @@ function createMultiJob(spec: MultipleQuerySpec, res: express.Response) {
             queryParams: q.params || { },
             modelNums: q.model_nums
         })),
-        writer: createResultWriter(res, spec.encoding?.toLowerCase() === 'bcif'),
-        options: { binary: spec.encoding?.toLowerCase() === 'bcif' }
+        writer,
+        options: { binary: spec.encoding?.toLowerCase() === 'bcif', tarball: spec.asTarGz }
     });
     responseMap.set(jobId, res);
     if (JobManager.size === 1) processNextJob();

+ 8 - 7
src/servers/model/server/jobs.ts

@@ -10,6 +10,7 @@ import { LinkedList } from '../../../mol-data/generic';
 import { ResultWriter } from '../utils/writer';
 
 export interface ResponseFormat {
+    tarball: boolean,
     isBinary: boolean
 }
 
@@ -25,6 +26,12 @@ export interface Job {
     writer: ResultWriter
 }
 
+export interface JobDefinition {
+    entries: JobEntry[],
+    writer: ResultWriter,
+    options?: { outputFilename?: string, binary?: boolean, tarball?: boolean }
+}
+
 export interface JobEntry {
     job: Job,
     sourceId: '_local_' | string,
@@ -62,19 +69,13 @@ export function JobEntry<Name extends QueryName>(definition: JobEntryDefinition<
     }
 }
 
-export interface JobDefinition {
-    entries: JobEntry[],
-    writer: ResultWriter,
-    options?: { outputFilename?: string, binary?: boolean }
-}
-
 export function createJob(definition: JobDefinition): Job {
     const job: Job = {
         id: UUID.create22(),
         datetime_utc: `${new Date().toISOString().replace(/T/, ' ').replace(/\..+/, '')}`,
         entries: definition.entries,
         writer: definition.writer,
-        responseFormat: { isBinary: !!(definition.options && definition.options.binary) },
+        responseFormat: { isBinary: !!(definition.options && definition.options.binary), tarball: !!definition?.options?.tarball },
         outputFilename: definition.options && definition.options.outputFilename
     };
     definition.entries.forEach(e => e.job = job);

+ 67 - 8
src/servers/model/server/query.ts

@@ -35,7 +35,15 @@ function propertyProvider() {
     return _propertyProvider;
 }
 
-export async function resolveJob(job: Job): Promise<CifWriter.Encoder<any>> {
+export async function resolveJob(job: Job) {
+    if (job.responseFormat.tarball) {
+        return resolveMultiFile(job);
+    } else {
+        return resolveSingleFile(job);
+    }
+}
+
+async function resolveSingleFile(job: Job) {
     ConsoleLogger.logId(job.id, 'Query', 'Starting.');
 
     const encoder = CifWriter.createEncoder({
@@ -44,20 +52,71 @@ export async function resolveJob(job: Job): Promise<CifWriter.Encoder<any>> {
         binaryAutoClassifyEncoding: true
     });
 
-    // TODO: how to handle missing entries?
+    const headerMap = new Map<string, number>();
+
     for (const entry of job.entries) {
-        const structure = await createStructureWrapperFromJobEntry(entry, propertyProvider());
+        try {
+            const structure = await createStructureWrapperFromJobEntry(entry, propertyProvider());
+
+            let header = structure.cifFrame.header.toUpperCase();
+            if (headerMap.has(header)) {
+                const i = headerMap.get(header)! + 1;
+                headerMap.set(header, i);
+                header += ' ' + i;
+            } else {
+                headerMap.set(header, 0)
+            }
 
-        // TODO: this should be unique in case the same structure is queried twice
-        // const data = (entry.sourceId === '_local_' ? path.basename(entry.entryId) : entry.entryId).replace(/[^a-z0-9\_]/ig, '').toUpperCase();
-        encoder.startDataBlock(structure.cifFrame.header);
-        await resolveJobEntry(entry, structure, encoder);
+            encoder.startDataBlock(header);
+            await resolveJobEntry(entry, structure, encoder);
+        } catch (e) {
+            if (job.entries.length === 1) {
+                throw e;
+            } else {
+                doError(entry, encoder, e);
+            }
+        }
     }
 
     ConsoleLogger.logId(job.id, 'Query', 'Encoding.');
     encoder.encode();
+    encoder.writeTo(job.writer);
+}
+
+function getFilename(i: number, entry: JobEntry, isBinary: boolean) {
+    return `${i}_${entry.entryId}_${entry.queryDefinition.name.replace(/\s/g, '_')}.${isBinary ? 'bcif' : 'cif'}`;
+}
 
-    return encoder;
+async function resolveMultiFile(job: Job) {
+    ConsoleLogger.logId(job.id, 'Query', 'Starting.');
+
+    let i = 0;
+    for (const entry of job.entries) {
+
+        const encoder = CifWriter.createEncoder({
+            binary: job.responseFormat.isBinary,
+            encoderName: `ModelServer ${Version}`,
+            binaryAutoClassifyEncoding: true
+        });
+
+        try {
+            const structure = await createStructureWrapperFromJobEntry(entry, propertyProvider());
+            encoder.startDataBlock(structure.cifFrame.header);
+            await resolveJobEntry(entry, structure, encoder);
+        } catch(e) {
+            doError(entry, encoder, e);
+        }
+
+        ConsoleLogger.logId(job.id, 'Query', `Encoding ${entry.key}/${entry.queryDefinition.name}`);
+        encoder.encode();
+
+        job.writer.beginEntry(getFilename(++i, entry, job.responseFormat.isBinary), encoder.getSize());
+        encoder.writeTo(job.writer);
+        job.writer.endEntry();
+        ConsoleLogger.logId(job.id, 'Query', `Written ${entry.key}/${entry.queryDefinition.name}`);
+
+        // await fileEntry;
+    }
 }
 
 async function resolveJobEntry(entry: JobEntry, structure: StructureWrapper, encoder: CifWriter.Encoder<any>) {

+ 211 - 0
src/servers/model/utils/tar.ts

@@ -0,0 +1,211 @@
+/**
+ * Adapter from https://github.com/mafintosh/tar-stream
+ * Copyright (c) 2014 Mathias Buus, MIT License (MIT)
+ */
+
+import { constants } from 'fs'
+
+let alloc = Buffer.alloc
+
+let ZEROS = '0000000000000000000'
+let SEVENS = '7777777777777777777'
+let ZERO_OFFSET = '0'.charCodeAt(0)
+let USTAR_MAGIC = Buffer.from('ustar\x00', 'binary')
+let USTAR_VER = Buffer.from('00', 'binary')
+let MASK = parseInt('7777', 8)
+let MAGIC_OFFSET = 257
+let VERSION_OFFSET = 263
+
+let toTypeflag = function (flag: string) {
+    switch (flag) {
+        case 'file':
+            return 0
+        case 'link':
+            return 1
+        case 'symlink':
+            return 2
+        case 'character-device':
+            return 3
+        case 'block-device':
+            return 4
+        case 'directory':
+            return 5
+        case 'fifo':
+            return 6
+        case 'contiguous-file':
+            return 7
+        case 'pax-header':
+            return 72
+    }
+
+    return 0
+}
+
+let indexOf = function (block: any, num: any, offset: any, end: any) {
+    for (; offset < end; offset++) {
+        if (block[offset] === num) return offset
+    }
+    return end
+}
+
+let cksum = function (block: any) {
+    let sum = 8 * 32
+    for (let i = 0; i < 148; i++) sum += block[i]
+    for (let j = 156; j < 512; j++) sum += block[j]
+    return sum
+}
+
+let encodeOct = function (val: any, n: any) {
+    val = val.toString(8)
+    if (val.length > n) return SEVENS.slice(0, n) + ' '
+    else return ZEROS.slice(0, n - val.length) + val + ' '
+}
+
+let decodeStr = function (val: any, offset: any, length: any, encoding?: any) {
+    return val.slice(offset, indexOf(val, 0, offset, offset + length)).toString(encoding)
+}
+
+let addLength = function (str: any) {
+    let len = Buffer.byteLength(str)
+    let digits = Math.floor(Math.log(len) / Math.log(10)) + 1
+    if (len + digits >= Math.pow(10, digits)) digits++
+
+    return (len + digits) + str
+}
+
+exports.decodeLongPath = function (buf: any, encoding: any) {
+    return decodeStr(buf, 0, buf.length, encoding)
+}
+
+exports.encodePax = function (opts: any) {
+    let result = ''
+    if (opts.name) result += addLength(' path=' + opts.name + '\n')
+    if (opts.linkname) result += addLength(' linkpath=' + opts.linkname + '\n')
+    let pax = opts.pax
+    if (pax) {
+        for (let key in pax) {
+            result += addLength(' ' + key + '=' + pax[key] + '\n')
+        }
+    }
+    return Buffer.from(result)
+}
+
+exports.decodePax = function (buf: any) {
+    let result: any = {}
+
+    while (buf.length) {
+        let i = 0
+        while (i < buf.length && buf[i] !== 32) i++
+        let len = parseInt(buf.slice(0, i).toString(), 10)
+        if (!len) return result
+
+        let b = buf.slice(i + 1, len - 1).toString()
+        let keyIndex = b.indexOf('=')
+        if (keyIndex === -1) return result
+        result[b.slice(0, keyIndex)] = b.slice(keyIndex + 1)
+
+        buf = buf.slice(len)
+    }
+
+    return result
+}
+
+export interface Headers {
+    name: string;
+    mode?: number;
+    uid?: number;
+    gid?: number;
+    size?: number;
+    mtime?: Date;
+    linkname?: string | null;
+    type?:
+    | 'file'
+    | 'link'
+    | 'symlink'
+    | 'character-device'
+    | 'block-device'
+    | 'directory'
+    | 'fifo'
+    | 'contiguous-file'
+    | 'pax-header'
+    | 'pax-global-header'
+    | 'gnu-long-link-path'
+    | 'gnu-long-path'
+    | null;
+    uname?: string;
+    gname?: string;
+    devmajor?: number;
+    devminor?: number;
+    typeflag?: number
+}
+
+function modeToType(mode: number) {
+    switch (mode & constants.S_IFMT) {
+        case constants.S_IFBLK: return 'block-device'
+        case constants.S_IFCHR: return 'character-device'
+        case constants.S_IFDIR: return 'directory'
+        case constants.S_IFIFO: return 'fifo'
+        case constants.S_IFLNK: return 'symlink'
+    }
+
+    return 'file'
+}
+
+let DMODE = parseInt('755', 8)
+let FMODE = parseInt('644', 8)
+
+function normalizeHeader(header: Headers) {
+    if (!header.size || header.type === 'symlink') header.size = 0
+    if (!header.type) header.type = modeToType(header.mode || 0)
+    if (!header.mode) header.mode = header.type === 'directory' ? DMODE : FMODE
+    if (!header.uid) header.uid = 0
+    if (!header.gid) header.gid = 0
+    if (!header.mtime) header.mtime = new Date()
+}
+
+export const END_OF_TAR = alloc(1024)
+
+export function encodeTarHeader(opts: Headers) {
+    normalizeHeader(opts);
+
+    let buf = alloc(512)
+    let name = opts.name
+    let prefix = ''
+
+    if (opts.typeflag === 5 && name[name.length - 1] !== '/') name += '/'
+    if (Buffer.byteLength(name) !== name.length) return null // utf-8
+
+    while (Buffer.byteLength(name) > 100) {
+        let i = name.indexOf('/')
+        if (i === -1) return null
+        prefix += prefix ? '/' + name.slice(0, i) : name.slice(0, i)
+        name = name.slice(i + 1)
+    }
+
+    if (Buffer.byteLength(name) > 100 || Buffer.byteLength(prefix) > 155) return null
+    if (opts.linkname && Buffer.byteLength(opts.linkname) > 100) return null
+
+    buf.write(name)
+    buf.write(encodeOct(opts.mode! & MASK, 6), 100)
+    buf.write(encodeOct(opts.uid, 6), 108)
+    buf.write(encodeOct(opts.gid, 6), 116)
+    buf.write(encodeOct(opts.size, 11), 124)
+    buf.write(encodeOct((opts.mtime?.getTime()! / 1000) | 0, 11), 136)
+
+    buf[156] = ZERO_OFFSET + toTypeflag(opts.type!)
+
+    if (opts.linkname) buf.write(opts.linkname, 157)
+
+    USTAR_MAGIC.copy(buf, MAGIC_OFFSET)
+    USTAR_VER.copy(buf, VERSION_OFFSET)
+    if (opts.uname) buf.write(opts.uname, 265)
+    if (opts.gname) buf.write(opts.gname, 297)
+    buf.write(encodeOct(opts.devmajor || 0, 6), 329)
+    buf.write(encodeOct(opts.devminor || 0, 6), 337)
+
+    if (prefix) buf.write(prefix, 345)
+
+    buf.write(encodeOct(cksum(buf), 6), 148)
+
+    return buf
+}

+ 82 - 6
src/servers/model/utils/writer.ts

@@ -8,11 +8,12 @@ import * as express from 'express';
 import * as fs from 'fs';
 import * as path from 'path';
 import { makeDir } from '../../../mol-util/make-dir';
+import { encodeTarHeader, END_OF_TAR } from './tar';
+import * as zlib from 'zlib'
 
 export interface ResultWriter {
-    beginEntry(name: string): void,
+    beginEntry(name: string, size: number): void,
     endEntry(): void,
-
     writeBinary(data: Uint8Array): boolean,
     writeString(data: string): boolean,
     end(): void
@@ -25,21 +26,28 @@ export interface WebResutlWriter extends ResultWriter {
 
 export class SimpleResponseResultWriter implements WebResutlWriter {
     private ended = false;
+    private headerWritten = false;
 
     beginEntry(name: string) {
         throw new Error('Not supported');
     }
 
-    endEntry() {
+    async endEntry() {
         throw new Error('Not supported');
     }
 
     doError(code = 404, message = 'Not Found.') {
-        this.res.status(code).send(message);
+        if (!this.headerWritten) {
+            this.headerWritten = true;
+            this.res.status(code).send(message);
+        }
         this.end();
     }
 
     writeHeader() {
+        if (this.headerWritten) return;
+        this.headerWritten = true;
+
         this.res.writeHead(200, {
             'Content-Type': this.isBinary ? 'application/octet-stream' : 'text/plain; charset=utf-8',
             'Access-Control-Allow-Origin': '*',
@@ -67,16 +75,84 @@ export class SimpleResponseResultWriter implements WebResutlWriter {
     }
 }
 
+export class TarballResponseResultWriter implements WebResutlWriter {
+    private ended = false;
+    private headerWritten = false;
+    private stream = zlib.createGzip({ level: 6, memLevel: 9, chunkSize: 16 * 16384 });
+    private entrySize = 0;
+
+
+    beginEntry(name: string, size: number) {
+        this.writeHeader();
+        const header = encodeTarHeader({ name, size });
+        this.entrySize = size;
+        this.stream.write(header);
+    }
+
+    endEntry() {
+        const size = this.entrySize & 511;
+        if (size) this.stream.write(END_OF_TAR.slice(0, 512 - size));
+    }
+
+    doError(code = 404, message = 'Not Found.') {
+        if (!this.headerWritten) {
+            this.headerWritten = true;
+            this.res.status(code).send(message);
+        }
+        this.end();
+    }
+
+    writeHeader() {
+        if (this.headerWritten) return;
+
+        this.stream.pipe(this.res);
+        this.stream.on('end', () => this.res.end());
+
+        this.headerWritten = true;
+        this.res.writeHead(200, {
+            'Content-Type': 'application/tar+gzip',
+            'Access-Control-Allow-Origin': '*',
+            'Access-Control-Allow-Headers': 'X-Requested-With',
+            'Content-Disposition': `inline; filename="${this.fn}"`
+        });
+    }
+
+    writeBinary(data: Uint8Array) {
+        this.writeHeader();
+        return !!this.stream.write(Buffer.from(data.buffer));
+    }
+
+    writeString(data: string) {
+        this.writeHeader();
+        return !!this.stream.write(data);
+    }
+
+    end() {
+        if (this.ended) return;
+        this.ended = true;
+
+        if (!this.headerWritten) {
+            return;
+        }
+
+        this.stream.write(END_OF_TAR);
+        this.stream.end();
+    }
+
+    constructor(private fn: string, private res: express.Response) {
+    }
+}
+
 export class FileResultWriter implements ResultWriter {
     private file = 0;
     private ended = false;
     private opened = false;
 
-    beginEntry(name: string) {
+    async beginEntry(name: string) {
         throw new Error('Not supported');
     }
 
-    endEntry() {
+    async endEntry() {
         throw new Error('Not supported');
     }
 

+ 1 - 1
src/servers/model/version.ts

@@ -4,4 +4,4 @@
  * @author David Sehnal <david.sehnal@gmail.com>
  */
 
-export default '0.9.0';
+export default '0.9.1';