Browse Source

ModelServer: Custom prop support in pre-processer

David Sehnal 6 years ago
parent
commit
027538efb8

+ 27 - 19
src/servers/model/preprocess/master.ts

@@ -7,8 +7,7 @@
 import * as fs from 'fs'
 import * as path from 'path'
 import * as argparse from 'argparse'
-import { preprocessFile } from './preprocess';
-import { ParallelPreprocessConfig, runMaster } from './parallel';
+import { runMaster, PreprocessEntry } from './parallel';
 
 const cmdParser = new argparse.ArgumentParser({
     addHelp: true,
@@ -17,14 +16,16 @@ const cmdParser = new argparse.ArgumentParser({
 cmdParser.addArgument(['--input', '-i'], { help: 'Input filename', required: false });
 cmdParser.addArgument(['--outCIF', '-oc'], { help: 'Output CIF filename', required: false });
 cmdParser.addArgument(['--outBCIF', '-ob'], { help: 'Output BinaryCIF filename', required: false });
-cmdParser.addArgument(['--bulk', '-b'], { help: 'Bulk JSON ({ numProcesses?: number, entries: { source: string, cif?: string, bcif?: string }[] })', required: false });
-cmdParser.addArgument(['--folderIn', '-f'], { help: 'Convert folder', required: false });
+// TODO: add back? cmdParser.addArgument(['--bulk', '-b'], { help: 'Bulk JSON ({ numProcesses?: number, entries: { source: string, cif?: string, bcif?: string }[] })', required: false });
+cmdParser.addArgument(['--cfg', '-c'], { help: 'Config file path', required: false });
+cmdParser.addArgument(['--folderIn', '-fin'], { help: 'Convert folder', required: false });
 cmdParser.addArgument(['--folderOutCIF', '-foc'], { help: 'Convert folder text output', required: false });
 cmdParser.addArgument(['--folderOutBCIF', '-fob'], { help: 'Convert folder binary output', required: false });
 cmdParser.addArgument(['--folderNumProcesses', '-fp'], { help: 'Convert folder num processes', required: false });
 
 interface CmdArgs {
-    bulk?: string,
+    // bulk?: string,
+    cfg?: string,
     input?: string,
     outCIF?: string,
     outBCIF?: string,
@@ -34,31 +35,38 @@ interface CmdArgs {
     folderNumProcesses?: string
 }
 
+export interface PreprocessConfig {
+    numProcesses?: number,
+    customPropertyProviders?: string[]
+}
+
 const cmdArgs = cmdParser.parseArgs() as CmdArgs;
 
-if (cmdArgs.input) preprocessFile(cmdArgs.input, cmdArgs.outCIF, cmdArgs.outBCIF);
-else if (cmdArgs.bulk) runBulk(cmdArgs.bulk);
-else if (cmdArgs.folderIn) runFolder(cmdArgs);
+let entries: PreprocessEntry[] = []
+let config: PreprocessConfig = { numProcesses: 1, customPropertyProviders: [] }
 
-function runBulk(input: string) {
-    const config = JSON.parse(fs.readFileSync(input, 'utf8')) as ParallelPreprocessConfig;
-    runMaster(config);
+if (cmdArgs.input) entries.push({ source: cmdArgs.input, cif: cmdArgs.outCIF, bcif: cmdArgs.outBCIF });
+// else if (cmdArgs.bulk) runBulk(cmdArgs.bulk);
+else if (cmdArgs.folderIn) findEntries();
+
+if (cmdArgs.cfg) {
+    config = JSON.parse(fs.readFileSync(cmdArgs.cfg, 'utf8')) as PreprocessConfig;
 }
 
-function runFolder(args: CmdArgs) {
-    const files = fs.readdirSync(args.folderIn!);
-    const config: ParallelPreprocessConfig = { numProcesses: +args.folderNumProcesses! || 1, entries: [] };
+runMaster(config, entries);
+
+function findEntries() {
+    const files = fs.readdirSync(cmdArgs.folderIn!);
     const cifTest = /\.cif$/;
     for (const f of files) {
         if (!cifTest.test(f)) continue;
 
-        config.entries.push({
-            source: path.join(args.folderIn!, f),
-            cif: cmdArgs.folderOutCIF ? path.join(args.folderOutCIF!, f) : void 0,
-            bcif: cmdArgs.folderOutBCIF ? path.join(args.folderOutBCIF!, path.parse(f).name + '.bcif') : void 0,
+        entries.push({
+            source: path.join(cmdArgs.folderIn!, f),
+            cif: cmdArgs.folderOutCIF ? path.join(cmdArgs.folderOutCIF!, f) : void 0,
+            bcif: cmdArgs.folderOutBCIF ? path.join(cmdArgs.folderOutBCIF!, path.parse(f).name + '.bcif') : void 0,
         });
     }
-    runMaster(config);
 }
 
 // example:

+ 30 - 19
src/servers/model/preprocess/parallel.ts

@@ -9,6 +9,9 @@ import * as cluster from 'cluster'
 import { now } from 'mol-task';
 import { PerformanceMonitor } from 'mol-util/performance-monitor';
 import { preprocessFile } from './preprocess';
+import { createModelPropertiesProviderFromSources } from '../property-provider';
+
+type PreprocessConfig = import('./master').PreprocessConfig
 
 export interface PreprocessEntry {
     source: string,
@@ -16,43 +19,41 @@ export interface PreprocessEntry {
     bcif?: string
 }
 
-export interface ParallelPreprocessConfig {
-    numProcesses?: number,
-    entries: PreprocessEntry[]
-}
-
-export function runMaster(config: ParallelPreprocessConfig) {
-    const parts = partitionArray(config.entries, config.numProcesses || 1);
-    // const numForks = Math.min(parts.length, config.numProcesses);
-
+export function runMaster(config: PreprocessConfig, entries: PreprocessEntry[]) {
     const started = now();
     let progress = 0;
     const onMessage = (msg: any) => {
         if (msg.type === 'tick') {
             progress++;
             const elapsed = now() - started;
-            console.log(`[${progress}/${config.entries.length}] in ${PerformanceMonitor.format(elapsed)} (avg ${PerformanceMonitor.format(elapsed / progress)}).`);
+            console.log(`[${progress}/${entries.length}] in ${PerformanceMonitor.format(elapsed)} (avg ${PerformanceMonitor.format(elapsed / progress)}).`);
         } else if (msg.type === 'error') {
             console.error(`${msg.id}: ${msg.error}`)
         }
     }
 
-    for (const _ of parts) {
-        const worker = cluster.fork();
-        worker.on('message', onMessage);
-    }
+    if (entries.length === 1) {
+        runSingle(entries[0], config, onMessage);
+    } else {
+        const parts = partitionArray(entries, config.numProcesses || 1);
+        for (const _ of parts) {
+            const worker = cluster.fork();
+            worker.on('message', onMessage);
+        }
 
-    let i = 0;
-    for (const id in cluster.workers) {
-        cluster.workers[id]!.send(parts[i++]);
+        let i = 0;
+        for (const id in cluster.workers) {
+            cluster.workers[id]!.send({ entries: parts[i++], config });
+        }
     }
 }
 
 export function runChild() {
-    process.on('message', async (entries: PreprocessEntry[]) => {
+    process.on('message', async ({ entries, config }: { entries: PreprocessEntry[], config: PreprocessConfig }) => {
+        const props = createModelPropertiesProviderFromSources(config.customPropertyProviders || []);
         for (const entry of entries) {
             try {
-                await preprocessFile(entry.source, entry.cif, entry.bcif);
+                await preprocessFile(entry.source, props, entry.cif, entry.bcif);
             } catch (e) {
                 process.send!({ type: 'error', id: path.parse(entry.source).name, error: '' + e });
             }
@@ -62,6 +63,16 @@ export function runChild() {
     });
 }
 
+async function runSingle(entry: PreprocessEntry, config: PreprocessConfig, onMessage: (msg: any) => void) {
+    const props = createModelPropertiesProviderFromSources(config.customPropertyProviders || []);
+    try {
+        await preprocessFile(entry.source, props, entry.cif, entry.bcif);
+    } catch (e) {
+        onMessage({ type: 'error', id: path.parse(entry.source).name, error: '' + e });
+    }
+    onMessage({ type: 'tick' });
+}
+
 function partitionArray<T>(xs: T[], count: number): T[][] {
     const ret: T[][] = [];
     const s = Math.ceil(xs.length / count);

+ 16 - 37
src/servers/model/preprocess/preprocess.ts

@@ -4,66 +4,45 @@
  * @author David Sehnal <david.sehnal@gmail.com>
  */
 
-import { readStructureWrapper, resolveStructure } from '../server/structure-wrapper';
+import { readStructureWrapper, resolveStructures } from '../server/structure-wrapper';
 import { classifyCif } from './converter';
-// import { ConsoleLogger } from 'mol-util/console-logger';
 import { Structure } from 'mol-model/structure';
 import { CifWriter } from 'mol-io/writer/cif';
 import Writer from 'mol-io/writer/writer';
 import { wrapFileToWriter } from '../server/api-local';
-import { Task/*, now*/ } from 'mol-task';
-import { /*showProgress, clearLine */ } from './util';
 import { encode_mmCIF_categories, CifExportContext } from 'mol-model/structure/export/mmcif';
+import { ModelPropertiesProvider } from '../property-provider';
 
 // TODO: error handling
-// let linearId = 0;
 
-export async function preprocessFile(filename: string, outputCif?: string, outputBcif?: string) {
-    // linearId++;
-
-    //const started = now();
-    //ConsoleLogger.log(`${linearId}`, `Reading '${filename}'...`);
-    // TODO: support the custom prop provider list here.
-    const input = await readStructureWrapper('entry', '_local_', filename, void 0);
+export async function preprocessFile(filename: string, propertyProvider?: ModelPropertiesProvider, outputCif?: string, outputBcif?: string) {
+    const input = await readStructureWrapper('entry', '_local_', filename, propertyProvider);
     const categories = await classifyCif(input.cifFrame);
-    const inputStructure = (await resolveStructure(input))!;
-    //ConsoleLogger.log(`${linearId}`, `Classifying CIF categories...`);
-    //clearLine();
-
-    const exportCtx = CifExportContext.create(inputStructure);
+    const inputStructures = (await resolveStructures(input))!;
+    const exportCtx = CifExportContext.create(inputStructures);
 
     if (outputCif) {
-        //ConsoleLogger.log(`${linearId}`, `Encoding CIF...`);
         const writer = wrapFileToWriter(outputCif);
         const encoder = CifWriter.createEncoder({ binary: false });
-        await encode(inputStructure, input.cifFrame.header, categories, encoder, exportCtx, writer);
-        // clearLine();
+        encode(inputStructures[0], input.cifFrame.header, categories, encoder, exportCtx, writer);
         writer.end();
     }
 
     if (outputBcif) {
-        // ConsoleLogger.log(`${linearId}`, `Encoding BinaryCIF...`);
         const writer = wrapFileToWriter(outputBcif);
         const encoder = CifWriter.createEncoder({ binary: true, binaryAutoClassifyEncoding: true });
-        await encode(inputStructure, input.cifFrame.header, categories, encoder, exportCtx, writer);
-        //clearLine();
+        encode(inputStructures[0], input.cifFrame.header, categories, encoder, exportCtx, writer);
         writer.end();
     }
-    // ConsoleLogger.log(`${linearId}`, `Finished '${filename}' in ${Math.round(now() - started)}ms`);
 }
 
 function encode(structure: Structure, header: string, categories: CifWriter.Category[], encoder: CifWriter.Encoder, exportCtx: CifExportContext[], writer: Writer) {
-    return Task.create('Encode', async ctx => {
-        const skipCategoryNames = new Set<string>(categories.map(c => c.name));
-        encoder.startDataBlock(header);
-        // let current = 0;
-        for (const cat of categories){
-            encoder.writeCategory(cat);
-            // current++;
-            // if (ctx.shouldUpdate) await ctx.update({ message: 'Encoding...', current, max: categories.length });
-        }
-        encode_mmCIF_categories(encoder, structure, { skipCategoryNames, exportCtx });
-        encoder.encode();
-        encoder.writeTo(writer);
-    }).run();
+    const skipCategoryNames = new Set<string>(categories.map(c => c.name));
+    encoder.startDataBlock(header);
+    for (const cat of categories) {
+        encoder.writeCategory(cat);
+    }
+    encode_mmCIF_categories(encoder, structure, { skipCategoryNames, exportCtx });
+    encoder.encode();
+    encoder.writeTo(writer);
 }

+ 6 - 3
src/servers/model/property-provider.ts

@@ -10,10 +10,14 @@ import Config from './config';
 export type ModelPropertiesProvider = (model: Model, cache: object) => Promise<any>[]
 
 export function createModelPropertiesProviderFromConfig(): ModelPropertiesProvider {
-    if (!Config.customPropertyProviders || Config.customPropertyProviders.length === 0) return () => [];
+    return createModelPropertiesProviderFromSources(Config.customPropertyProviders);
+}
+
+export function createModelPropertiesProviderFromSources(sources: string[]): ModelPropertiesProvider {
+    if (!sources || sources.length === 0) return () => [];
 
     const ps: ModelPropertiesProvider[] = [];
-    for (const p of Config.customPropertyProviders) {
+    for (const p of sources) {
         ps.push(require(p).attachModelProperties);
     }
 
@@ -25,4 +29,3 @@ export function createModelPropertiesProviderFromConfig(): ModelPropertiesProvid
         return ret;
     }
 }
-

+ 8 - 3
src/servers/model/server/query.ts

@@ -16,7 +16,7 @@ import Version from '../version';
 import { Job } from './jobs';
 import { createStructureWrapperFromJob, StructureWrapper, resolveStructures } from './structure-wrapper';
 import CifField = CifWriter.Field
-import { createModelPropertiesProviderFromConfig } from '../property-provider';
+import { createModelPropertiesProviderFromConfig, ModelPropertiesProvider } from '../property-provider';
 
 export interface Stats {
     structure: StructureWrapper,
@@ -26,12 +26,17 @@ export interface Stats {
 
 const perf = new PerformanceMonitor();
 
-const propertyProvider = createModelPropertiesProviderFromConfig();
+let _propertyProvider: ModelPropertiesProvider;
+function propertyProvider() {
+    if (_propertyProvider) return _propertyProvider;
+    _propertyProvider = createModelPropertiesProviderFromConfig();
+    return _propertyProvider;
+}
 
 export async function resolveJob(job: Job): Promise<CifWriter.Encoder<any>> {
     ConsoleLogger.logId(job.id, 'Query', 'Starting.');
 
-    const wrappedStructure = await createStructureWrapperFromJob(job, propertyProvider);
+    const wrappedStructure = await createStructureWrapperFromJob(job, propertyProvider());
 
     try {
         perf.start('query');