diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala index 7b49d746dd8ad385e1be716a36a1db20a834e7ed..550a36e0d931cac4ed79dc3113e474286d86f96b 100644 --- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala +++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala @@ -135,9 +135,15 @@ class Main { } }) - val archiveHandler = new ArchiveHandler(tempExtracted) - val unpacker = Flow.fromGraph(new ArchiveUnpackingFlow(archiveHandler)) - val cleanUp = Flow.fromGraph(new ArchiveCleanUpFlow(archiveHandler)) + val archiveHandler = new WholeZipArchiveHandler(new ArchiveHandler(tempExtracted)) + val unpacker = Flow.fromGraph(new MessageProcessorFlow[TreeScanSignal, TreeScanSignal] { + override val stageName = "Zip-Archive-Unpacker" + override val processor = new ArchiveUnpacker(archiveHandler) + }) + val cleanUp = Flow.fromGraph(new MessageProcessorFlow[FileParsingSignal, FileParsingSignal] { + override val stageName = "Zip-Archive-CleanUp" + override val processor = new ArchiveCleanUp(archiveHandler) + }) val parsing = CalculationParsingFlow.createParsingFlow( (1 to params.numWorkers).map(i => new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d")))