From bfbc3fa61aaf70e35ffdce036732b8c303dd305f Mon Sep 17 00:00:00 2001
From: Arvid Ihrig <ihrig@fhi-berlin.mpg.de>
Date: Wed, 1 Aug 2018 12:23:39 +0200
Subject: [PATCH] Integrated Pipeline: refactored ArchiveHandling now used in
 main code
---
 .../eu/nomad_lab/integrated_pipeline/Main.scala      | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
index 7b49d746..550a36e0 100644
--- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
+++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
@@ -135,9 +135,15 @@ class Main {
           }
         })
 
-        val archiveHandler = new ArchiveHandler(tempExtracted)
-        val unpacker = Flow.fromGraph(new ArchiveUnpackingFlow(archiveHandler))
-        val cleanUp = Flow.fromGraph(new ArchiveCleanUpFlow(archiveHandler))
+        val archiveHandler = new WholeZipArchiveHandler(new ArchiveHandler(tempExtracted))
+        val unpacker = Flow.fromGraph(new MessageProcessorFlow[TreeScanSignal, TreeScanSignal] {
+          override val stageName = "Zip-Archive-Unpacker"
+          override val processor = new ArchiveUnpacker(archiveHandler)
+        })
+        val cleanUp = Flow.fromGraph(new MessageProcessorFlow[FileParsingSignal, FileParsingSignal] {
+          override val stageName = "Zip-Archive-CleanUp"
+          override val processor = new ArchiveCleanUp(archiveHandler)
+        })
         val parsing = CalculationParsingFlow.createParsingFlow(
           (1 to params.numWorkers).map(i =>
             new CalculationParsingEngine(Main.parsers, metaInfo, eventProcessor, Some(f"Worker-$i%2d")))
-- 
GitLab