From b314247c7ff32aed86a7ea42a8bed11807029f99 Mon Sep 17 00:00:00 2001
From: Arvid Ihrig <ihrig@fhi-berlin.mpg.de>
Date: Mon, 25 Jun 2018 13:00:44 +0200
Subject: [PATCH] Integrated Pipeline: CalculationParsingFlow extracts to be
 processed rawdata archives to a temporary directory

-intended to be used with with a RAMdisk filesystem in production, thus the entire archive (at most 30 GB)
 is extracted once and shared by all workers instead of extracting worker-private copies of the required
 file-trees
---
 .../eu/nomad_lab/integrated_pipeline/Main.scala      |  7 +++++--
 .../stream_components/CalculationParsingFlow.scala   |  3 ++-
 .../CalculationParsingFlowSpec.scala                 | 12 +++++++++++-
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
index c1ae3693..b4d2c523 100644
--- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
+++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/Main.scala
@@ -15,7 +15,7 @@
  */
 package eu.nomad_lab.integrated_pipeline
 
-import java.nio.file.Paths
+import java.nio.file.{ Files, Paths }
 import java.util.concurrent.LinkedBlockingQueue
 
 import akka.actor._
@@ -111,6 +111,8 @@ class Main {
     val mode = OutputType.withName(params.flatMap(Main.outputModeRe.findFirstMatchIn(_)).head.group(1))
     val outDir = Paths.get(params.flatMap(Main.outputDirRe.findFirstMatchIn(_)).head.group(1))
     val sinkSummaries: Sink[FileTreeParsingResult, Future[Done]] = Sink.ignore
+    val tempExtracted = Files.createTempDirectory("parsing-extracted-files")
+    tempExtracted.toFile.deleteOnExit()
 
     val graph = RunnableGraph.fromGraph(
       GraphDSL.create(sinkSummaries) { implicit builder => (sink) =>
@@ -121,7 +123,8 @@ class Main {
           case TreeType.Directory => new DirectoryTreeParserFlow
         })
         val parsing = builder.add(new CalculationParsingFlow(
-          (1 to 1).map(_ => new CalculationParsingEngine(parsers))
+          (1 to 1).map(_ => new CalculationParsingEngine(parsers)),
+          new ArchiveHandler(tempExtracted)
         ))
         val processor = builder.add(mode match {
           case OutputType.Json => new WriteToJsonResultsProcessorFlow(outDir)
diff --git a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingFlow.scala b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingFlow.scala
index de744222..108d9c1a 100644
--- a/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingFlow.scala
+++ b/integrated-pipeline/src/main/scala/eu/nomad_lab/integrated_pipeline/stream_components/CalculationParsingFlow.scala
@@ -9,7 +9,7 @@ import eu.nomad_lab.integrated_pipeline.messages._
 
 import scala.collection.mutable
 
-class CalculationParsingFlow(engines: Seq[CalculationParsingEngine])
+class CalculationParsingFlow(engines: Seq[CalculationParsingEngine], archiveHandler: ArchiveHandler)
     extends GraphStage[FlowShape[FileParsingTaskSignal, FileParsingResultSignal]] {
 
   require(engines.nonEmpty, "must have at least one parsing engine")
@@ -38,6 +38,7 @@ class CalculationParsingFlow(engines: Seq[CalculationParsingEngine])
           case signal: FileParsingTaskStartTree =>
             require(!pathMap.contains(signal.treeTask), "file tree already started")
             pathMap(signal.treeTask) = signal.treeTask.treeType match {
+              case TreeType.Zip => archiveHandler.extractZipArchive(signal.treeTask.treeBasePath)
               case TreeType.Directory => signal.treeTask.treeBasePath
             }
             push(out, FileParsingResultStartTree(signal.treeTask))
diff --git a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingFlowSpec.scala b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingFlowSpec.scala
index 5bc55721..ead05885 100644
--- a/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingFlowSpec.scala
+++ b/integrated-pipeline/src/test/scala/integrated_pipeline_tests/CalculationParsingFlowSpec.scala
@@ -47,6 +47,7 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
       })
       myMock
     }
+    val dummyArchiveHandler = mock[ArchiveHandler]
 
     private val testInput = TestSource.probe[FileParsingTaskSignal]
     private val testRequests = TestSink.probe[FileParsingResultSignal]
@@ -54,7 +55,9 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
       GraphDSL.create(testInput, testRequests)((_, _)) { implicit builder => (source, sink) =>
         import GraphDSL.Implicits._
 
-        val calculationParser = builder.add(new CalculationParsingFlow(dummyWorkers))
+        val calculationParser = builder.add(
+          new CalculationParsingFlow(dummyWorkers, dummyArchiveHandler)
+        )
 
         source ~> calculationParser ~> sink
         ClosedShape
@@ -128,6 +131,13 @@ class CalculationParsingFlowSpec extends WordSpec with MockitoSugar {
         probeRequests.expectComplete()
       }
 
+      "unpack a zip archive when the start tree signal arrives" in new GraphWithDummyWorkers(1) {
+        probeInput.sendNext(FileParsingTaskStartTree(sampleTreeScan.copy(treeType = TreeType.Zip)))
+        probeRequests.ensureSubscription().request(1)
+        probeRequests.expectNext()
+        verify(dummyArchiveHandler).extractZipArchive(sampleTreeScan.treeBasePath)
+      }
+
     }
   }
 
-- 
GitLab